From 901d4712a60d60c5ae003739204f6f3db59071a7 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Tue, 3 Nov 2020 10:46:29 +0300 Subject: [PATCH 1/2] WIP --- .../apache/ignite/IgniteSystemProperties.java | 37 ++ .../processors/cache/GridCacheAdapter.java | 60 ++- .../transactions/TransactionProxyImpl.java | 3 + .../internal/processors/tracing/SpanType.java | 8 +- .../GridTracingConfigurationManager.java | 4 + .../internal/tracing/TracingSpiType.java | 5 +- .../tcp/internal/GridNioServerWrapper.java | 2 +- .../FilePerformanceStatisticsWriter.java | 486 ++++++++++++++++++ .../ignite/spi/tracing/OperationType.java | 173 +++++++ .../PerformanceStatisticsTracingSpi.java | 257 +++++++++ .../org/apache/ignite/spi/tracing/Scope.java | 5 +- .../tracing/TracingConfigurationManager.java | 10 + .../internal/processors/PerfStatTest.java | 122 +++++ 13 files changed, 1146 insertions(+), 26 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/spi/tracing/FilePerformanceStatisticsWriter.java create mode 100644 modules/core/src/main/java/org/apache/ignite/spi/tracing/OperationType.java create mode 100644 modules/core/src/main/java/org/apache/ignite/spi/tracing/PerformanceStatisticsTracingSpi.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/PerfStatTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 949e9c6f6ed7c4..a9fd0fc13a246b 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -41,6 +41,7 @@ import org.apache.ignite.lang.IgniteExperimental; import org.apache.ignite.mxbean.MetricsMxBean; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.tracing.FilePerformanceStatisticsWriter; import org.apache.ignite.stream.StreamTransformer; import org.jetbrains.annotations.Nullable; @@ -137,6 +138,10 @@ import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_DISCO_FAILED_CLIENT_RECONNECT_DELAY; import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_NODE_IDS_HISTORY_SIZE; import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_THROTTLE_RECONNECT_RESET_TIMEOUT_INTERVAL; +import static org.apache.ignite.spi.tracing.FilePerformanceStatisticsWriter.DFLT_BUFFER_SIZE; +import static org.apache.ignite.spi.tracing.FilePerformanceStatisticsWriter.DFLT_CACHED_STRINGS_THRESHOLD; +import static org.apache.ignite.spi.tracing.FilePerformanceStatisticsWriter.DFLT_FILE_MAX_SIZE; +import static org.apache.ignite.spi.tracing.FilePerformanceStatisticsWriter.DFLT_FLUSH_SIZE; import static org.apache.ignite.startup.cmdline.CommandLineStartup.DFLT_PROG_NAME; /** @@ -1935,6 +1940,38 @@ public final class IgniteSystemProperties { defaults = "" + DFLT_DUMP_TX_COLLISIONS_INTERVAL) public static final String IGNITE_DUMP_TX_COLLISIONS_INTERVAL = "IGNITE_DUMP_TX_COLLISIONS_INTERVAL"; + /** + * Performance statistics maximum file size in bytes. Performance statistics will be stopped when the size exceeded. + * The default value is {@link FilePerformanceStatisticsWriter#DFLT_FILE_MAX_SIZE}. + */ + @SystemProperty(value = "Performance statistics maximum file size in bytes. Performance statistics will be " + + "stopped when the size exceeded", type = Long.class, defaults = "" + DFLT_FILE_MAX_SIZE) + public static final String IGNITE_PERF_STAT_FILE_MAX_SIZE = "IGNITE_PERF_STAT_FILE_MAX_SIZE"; + + /** + * Performance statistics off heap buffer size in bytes. The default value is + * {@link FilePerformanceStatisticsWriter#DFLT_BUFFER_SIZE}. + */ + @SystemProperty(value = "Performance statistics off heap buffer size in bytes", type = Integer.class, + defaults = "" + DFLT_BUFFER_SIZE) + public static final String IGNITE_PERF_STAT_BUFFER_SIZE = "IGNITE_PERF_STAT_BUFFER_SIZE"; + + /** + * Performance statistics minimal batch size to flush in bytes. The default value is + * {@link FilePerformanceStatisticsWriter#DFLT_FLUSH_SIZE}. + */ + @SystemProperty(value = "Performance statistics minimal batch size to flush in bytes", type = Integer.class, + defaults = "" + DFLT_FLUSH_SIZE) + public static final String IGNITE_PERF_STAT_FLUSH_SIZE = "IGNITE_PERF_STAT_FLUSH_SIZE"; + + /** + * Performance statistics maximum cached strings threshold. String caching will stop on threshold excess. + * The default value is {@link FilePerformanceStatisticsWriter#DFLT_CACHED_STRINGS_THRESHOLD}. + */ + @SystemProperty(value = "Performance statistics maximum cached strings threshold. String caching will stop on " + + "threshold excess", type = Integer.class, defaults = "" + DFLT_CACHED_STRINGS_THRESHOLD) + public static final String IGNITE_PERF_STAT_CACHED_STRINGS_THRESHOLD = "IGNITE_PERF_STAT_CACHED_STRINGS_THRESHOLD"; + /** * Enforces singleton. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index fa956bc9fc00a2..5192cdcd66f001 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -110,6 +110,8 @@ import org.apache.ignite.internal.processors.dr.IgniteDrDataStreamerCacheUpdater; import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.processors.tracing.MTC; +import org.apache.ignite.internal.processors.tracing.Span; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; @@ -169,6 +171,8 @@ import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.cacheMetricsRegistryName; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID; +import static org.apache.ignite.internal.processors.tracing.SpanType.CACHE_GET; +import static org.apache.ignite.internal.processors.tracing.SpanType.CACHE_PUT; import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; @@ -1467,27 +1471,33 @@ private boolean evictx(K key, GridCacheVersion ver, @Nullable @Override public V get(K key) throws IgniteCheckedException { A.notNull(key, "key"); - boolean statsEnabled = ctx.statisticsEnabled(); + Span span = ctx.kernalContext().tracing().create(CACHE_GET, MTC.span()); - long start = statsEnabled ? System.nanoTime() : 0L; + try (MTC.TraceSurroundings ignored = MTC.support(span)) { + span.addTag("cacheId", () -> String.valueOf(ctx.cacheId())); - boolean keepBinary = ctx.keepBinary(); + boolean statsEnabled = ctx.statisticsEnabled(); - if (keepBinary) - key = (K)ctx.toCacheKeyObject(key); + long start = statsEnabled ? System.nanoTime() : 0L; - V val = repairableGet(key, !keepBinary, false); + boolean keepBinary = ctx.keepBinary(); - if (ctx.config().getInterceptor() != null) { - key = keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key, true, false) : key; + if (keepBinary) + key = (K)ctx.toCacheKeyObject(key); - val = (V)ctx.config().getInterceptor().onGet(key, val); - } + V val = repairableGet(key, !keepBinary, false); - if (statsEnabled) - metrics0().addGetTimeNanos(System.nanoTime() - start); + if (ctx.config().getInterceptor() != null) { + key = keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key, true, false) : key; - return val; + val = (V)ctx.config().getInterceptor().onGet(key, val); + } + + if (statsEnabled) + metrics0().addGetTimeNanos(System.nanoTime() - start); + + return val; + } } /** {@inheritDoc} */ @@ -2565,21 +2575,27 @@ public IgniteInternalFuture getAndPutAsync0(final K key, */ public boolean put(final K key, final V val, final CacheEntryPredicate filter) throws IgniteCheckedException { - boolean statsEnabled = ctx.statisticsEnabled(); + Span span = ctx.kernalContext().tracing().create(CACHE_PUT, MTC.span()); - long start = statsEnabled ? System.nanoTime() : 0L; + try (MTC.TraceSurroundings ignored = MTC.support(span)) { + span.addTag("cacheId", () -> String.valueOf(ctx.cacheId())); - A.notNull(key, "key", val, "val"); + boolean statsEnabled = ctx.statisticsEnabled(); - if (keyCheck) - validateCacheKey(key); + long start = statsEnabled ? System.nanoTime() : 0L; - boolean stored = put0(key, val, filter); + A.notNull(key, "key", val, "val"); - if (statsEnabled && stored) - metrics0().addPutTimeNanos(System.nanoTime() - start); + if (keyCheck) + validateCacheKey(key); - return stored; + boolean stored = put0(key, val, filter); + + if (statsEnabled && stored) + metrics0().addPutTimeNanos(System.nanoTime() - start); + + return stored; + } } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java index 3cc390e147737e..b62c1a5408d5ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java @@ -356,6 +356,9 @@ private void leave() { @Override public void close() { Span span = MTC.span(); + span.addTag("commited", () -> String.valueOf(!tx.isRollbackOnly())); + span.addTag("cacheIds", () -> tx.txState().cacheIds().toString()); + try (TraceSurroundings ignored = MTC.support(cctx.kernalContext().tracing().create(TX_CLOSE, span))) { enter(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/SpanType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/SpanType.java index 693bb9630e4954..f464abb6fd33a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/SpanType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/SpanType.java @@ -234,7 +234,13 @@ public enum SpanType { SQL_CACHE_UPDATE(Scope.SQL, "sql.cache.update", 69), /** Processing of incoming batch. */ - SQL_BATCH_PROCESS(Scope.SQL, "sql.batch.process", 70); + SQL_BATCH_PROCESS(Scope.SQL, "sql.batch.process", 70), + + /** */ + CACHE_GET(Scope.CACHE_API, "cache.get", 71, true), + + /** */ + CACHE_PUT(Scope.CACHE_API, "cache.put", 72, true); /** Scope */ private Scope scope; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/configuration/GridTracingConfigurationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/configuration/GridTracingConfigurationManager.java index 9ffd33f9606154..179556ed9412ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/configuration/GridTracingConfigurationManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/configuration/GridTracingConfigurationManager.java @@ -84,6 +84,10 @@ public class GridTracingConfigurationManager implements TracingConfigurationMana new TracingConfigurationCoordinates.Builder(Scope.SQL).build(), TracingConfigurationManager.DEFAULT_SQL_CONFIGURATION); + tmpDfltConfigurationMap.put( + new TracingConfigurationCoordinates.Builder(Scope.CACHE_API).build(), + TracingConfigurationManager.DEFAULT_CACHE_API_CONFIGURATION); + DEFAULT_CONFIGURATION_MAP = Collections.unmodifiableMap(tmpDfltConfigurationMap); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/tracing/TracingSpiType.java b/modules/core/src/main/java/org/apache/ignite/internal/tracing/TracingSpiType.java index b070e1b86e5751..f50450eb8c26ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/tracing/TracingSpiType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/tracing/TracingSpiType.java @@ -25,7 +25,10 @@ public enum TracingSpiType { NOOP_TRACING_SPI((byte)0), /** */ - OPEN_CENSUS_TRACING_SPI((byte)1); + OPEN_CENSUS_TRACING_SPI((byte)1), + + /** */ + PERFORMANCE_STATISTICS_TRACING_SPI((byte)2); /** Byte index of a tracing spi instance. */ private final byte idx; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java index 33f8c498e037d2..12848e5db569ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java @@ -881,7 +881,7 @@ public GridNioServer resetNioServer() throws IgniteCheckedException { List filters = new ArrayList<>(); - if (tracing instanceof GridTracingManager && ((GridManager)tracing).enabled()) + if (tracing instanceof GridTracingManager && ((GridManager)tracing).enabled() && false) filters.add(new GridNioTracerFilter(log, tracing)); filters.add(new GridNioCodecFilter(parser, log, true)); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/tracing/FilePerformanceStatisticsWriter.java b/modules/core/src/main/java/org/apache/ignite/spi/tracing/FilePerformanceStatisticsWriter.java new file mode 100644 index 00000000000000..d1667a21e05d0a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/tracing/FilePerformanceStatisticsWriter.java @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.tracing; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.GridIntIterator; +import org.apache.ignite.internal.util.GridIntList; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.thread.IgniteThread; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_PERF_STAT_BUFFER_SIZE; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_PERF_STAT_CACHED_STRINGS_THRESHOLD; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_PERF_STAT_FILE_MAX_SIZE; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_PERF_STAT_FLUSH_SIZE; +import static org.apache.ignite.spi.tracing.OperationType.JOB; +import static org.apache.ignite.spi.tracing.OperationType.QUERY; +import static org.apache.ignite.spi.tracing.OperationType.QUERY_READS; +import static org.apache.ignite.spi.tracing.OperationType.TASK; +import static org.apache.ignite.spi.tracing.OperationType.TX_COMMIT; +import static org.apache.ignite.spi.tracing.OperationType.TX_ROLLBACK; +import static org.apache.ignite.spi.tracing.OperationType.cacheRecordSize; +import static org.apache.ignite.spi.tracing.OperationType.jobRecordSize; +import static org.apache.ignite.spi.tracing.OperationType.queryReadsRecordSize; +import static org.apache.ignite.spi.tracing.OperationType.queryRecordSize; +import static org.apache.ignite.spi.tracing.OperationType.taskRecordSize; +import static org.apache.ignite.spi.tracing.OperationType.transactionRecordSize; + +/** + * Performance statistics writer based on logging to a file. + *

+ * Each node collects statistics to a file placed under {@link #PERF_STAT_DIR}. + *

+ */ +public class FilePerformanceStatisticsWriter { + /** Directory to store performance statistics files. Placed under Ignite work directory. */ + public static final String PERF_STAT_DIR = "perf_stat"; + + /** Default maximum file size in bytes. Performance statistics will be stopped when the size exceeded. */ + public static final long DFLT_FILE_MAX_SIZE = 32 * U.GB; + + /** Default off heap buffer size in bytes. */ + public static final int DFLT_BUFFER_SIZE = (int)(32 * U.MB); + + /** Default minimal batch size to flush in bytes. */ + public static final int DFLT_FLUSH_SIZE = (int)(8 * U.MB); + + /** Default maximum cached strings threshold. String caching will stop on threshold excess. */ + public static final int DFLT_CACHED_STRINGS_THRESHOLD = 1024; + + /** File writer thread name. */ + static final String WRITER_THREAD_NAME = "performance-statistics-writer"; + + /** Minimal batch size to flush in bytes. */ + private final int flushSize = + IgniteSystemProperties.getInteger(IGNITE_PERF_STAT_FLUSH_SIZE, DFLT_FLUSH_SIZE); + + /** Maximum cached strings threshold. String caching will stop on threshold excess. */ + private final int cachedStrsThreshold = + IgniteSystemProperties.getInteger(IGNITE_PERF_STAT_CACHED_STRINGS_THRESHOLD, DFLT_CACHED_STRINGS_THRESHOLD); + + /** Factory to provide I/O interface. */ + private final FileIOFactory fileIoFactory = new RandomAccessFileIOFactory(); + + /** Performance statistics file I/O. */ + private final FileIO fileIo; + + /** Performance statistics file writer worker. */ + private final FileWriter fileWriter; + + /** File writer thread started flag. */ + private boolean started; + + /** File write buffer. */ + private final SegmentedRingByteBuffer ringByteBuf; + + /** Count of written to buffer bytes. */ + private final AtomicInteger writtenToBuf = new AtomicInteger(); + + /** {@code True} if the small buffer warning message logged. */ + private final AtomicBoolean smallBufLogged = new AtomicBoolean(); + + /** {@code True} if worker stopped due to maximum file size reached. */ + private final AtomicBoolean stopByMaxSize = new AtomicBoolean(); + + /** Logger. */ + private final IgniteLogger log; + + /** Hashcodes of cached strings. */ + private final Set knownStrs = new GridConcurrentHashSet<>(); + + /** Count of cached strings. */ + private volatile int knownStrsSz; + + /** @param ctx Kernal context. */ + public FilePerformanceStatisticsWriter(GridKernalContext ctx) throws IgniteCheckedException, IOException { + log = ctx.log(getClass()); + + File file = resolveStatisticsFile(ctx); + + fileIo = fileIoFactory.create(file); + + log.info("Performance statistics file created [file=" + file.getAbsolutePath() + ']'); + + long fileMaxSize = IgniteSystemProperties.getLong(IGNITE_PERF_STAT_FILE_MAX_SIZE, DFLT_FILE_MAX_SIZE); + int bufSize = IgniteSystemProperties.getInteger(IGNITE_PERF_STAT_BUFFER_SIZE, DFLT_BUFFER_SIZE); + + ringByteBuf = new SegmentedRingByteBuffer(bufSize, fileMaxSize, SegmentedRingByteBuffer.BufferMode.DIRECT); + + fileWriter = new FileWriter(ctx, log); + } + + /** Starts collecting performance statistics. */ + public synchronized void start() { + assert !started; + + new IgniteThread(fileWriter).start(); + + started = true; + } + + /** Stops collecting performance statistics. */ + public synchronized void stop() { + assert started; + + // Stop accepting new records. + ringByteBuf.close(); + + U.awaitForWorkersStop(Collections.singleton(fileWriter), true, log); + + // Make sure that all producers released their buffers to safe deallocate memory (in case of worker + // stopped abnormally). + ringByteBuf.poll(); + + ringByteBuf.free(); + + try { + fileIo.force(); + } + catch (IOException e) { + log.warning("Failed to fsync the performance statistics file.", e); + } + + U.closeQuiet(fileIo); + + knownStrs.clear(); + + started = false; + } + + /** + * @param type Operation type. + * @param cacheId Cache id. + * @param startTime Start time in milliseconds. + * @param duration Duration in nanoseconds. + */ + public void cacheOperation(OperationType type, int cacheId, long startTime, long duration) { + doWrite(type, cacheRecordSize(), buf -> { + buf.putInt(cacheId); + buf.putLong(startTime); + buf.putLong(duration); + }); + } + + /** + * @param cacheIds Cache IDs. + * @param startTime Start time in milliseconds. + * @param duration Duration in nanoseconds. + * @param commited {@code True} if commited. + */ + public void transaction(GridIntList cacheIds, long startTime, long duration, boolean commited) { + doWrite(commited ? TX_COMMIT : TX_ROLLBACK, transactionRecordSize(cacheIds.size()), buf -> { + buf.putInt(cacheIds.size()); + + GridIntIterator iter = cacheIds.iterator(); + + while (iter.hasNext()) + buf.putInt(iter.next()); + + buf.putLong(startTime); + buf.putLong(duration); + }); + } + + /** + * @param type Cache query type. + * @param text Query text in case of SQL query. Cache name in case of SCAN query. + * @param id Query id. + * @param startTime Start time in milliseconds. + * @param duration Duration in nanoseconds. + * @param success Success flag. + */ + public void query(GridCacheQueryType type, String text, long id, long startTime, long duration, boolean success) { + boolean cached = cacheIfPossible(text); + + doWrite(QUERY, queryRecordSize(cached ? 0 : text.getBytes().length, cached), buf -> { + writeString(buf, text, cached); + buf.put((byte)type.ordinal()); + buf.putLong(id); + buf.putLong(startTime); + buf.putLong(duration); + buf.put(success ? (byte)1 : 0); + }); + } + + /** + * @param type Cache query type. + * @param queryNodeId Originating node id. + * @param id Query id. + * @param logicalReads Number of logical reads. + * @param physicalReads Number of physical reads. + */ + public void queryReads(GridCacheQueryType type, UUID queryNodeId, long id, long logicalReads, long physicalReads) { + doWrite(QUERY_READS, queryReadsRecordSize(), buf -> { + buf.put((byte)type.ordinal()); + writeUuid(buf, queryNodeId); + buf.putLong(id); + buf.putLong(logicalReads); + buf.putLong(physicalReads); + }); + } + + /** + * @param sesId Session id. + * @param taskName Task name. + * @param startTime Start time in milliseconds. + * @param duration Duration. + * @param affPartId Affinity partition id. + */ + public void task(IgniteUuid sesId, String taskName, long startTime, long duration, int affPartId) { + boolean cached = cacheIfPossible(taskName); + + doWrite(TASK, taskRecordSize(cached ? 0 : taskName.getBytes().length, cached), buf -> { + writeString(buf, taskName, cached); + writeIgniteUuid(buf, sesId); + buf.putLong(startTime); + buf.putLong(duration); + buf.putInt(affPartId); + }); + } + + /** + * @param sesId Session id. + * @param queuedTime Time job spent on waiting queue. + * @param startTime Start time in milliseconds. + * @param duration Job execution time. + * @param timedOut {@code True} if job is timed out. + */ + public void job(IgniteUuid sesId, long queuedTime, long startTime, long duration, boolean timedOut) { + doWrite(JOB, jobRecordSize(), buf -> { + writeIgniteUuid(buf, sesId); + buf.putLong(queuedTime); + buf.putLong(startTime); + buf.putLong(duration); + buf.put(timedOut ? (byte)1 : 0); + }); + } + + /** + * @param op Operation type. + * @param recSize Record size. + * @param writer Record writer. + */ + private void doWrite(OperationType op, int recSize, Consumer writer) { + int size = recSize + /*type*/ 1; + + SegmentedRingByteBuffer.WriteSegment seg = ringByteBuf.offer(size); + + if (seg == null) { + if (smallBufLogged.compareAndSet(false, true)) { + log.warning("The performance statistics in-memory buffer size is too small. Some operations " + + "will not be logged."); + } + + return; + } + + // Ring buffer closed (writer stopping) or maximum size reached. + if (seg.buffer() == null) { + seg.release(); + + if (!fileWriter.isCancelled() && stopByMaxSize.compareAndSet(false, true)) + log.warning("The performance statistics file maximum size is reached."); + + return; + } + + ByteBuffer buf = seg.buffer(); + + buf.put(op.id()); + + writer.accept(buf); + + seg.release(); + + int bufCnt = writtenToBuf.get() / flushSize; + + if (writtenToBuf.addAndGet(size) / flushSize > bufCnt) { + // Wake up worker to start writing data to the file. + synchronized (fileWriter) { + fileWriter.notify(); + } + } + } + + /** @return Performance statistics file. */ + private static File resolveStatisticsFile(GridKernalContext ctx) throws IgniteCheckedException { + String igniteWorkDir = U.workDirectory(ctx.config().getWorkDirectory(), ctx.config().getIgniteHome()); + + File fileDir = U.resolveWorkDirectory(igniteWorkDir, PERF_STAT_DIR, false); + + File file = new File(fileDir, "node-" + ctx.localNodeId() + ".prf");; + + int idx = 0; + + while (file.exists()) { + idx++; + + file = new File(fileDir, "node-" + ctx.localNodeId() + '-' + idx + ".prf"); + } + + return file; + } + + /** Writes {@link UUID} to buffer. */ + private static void writeUuid(ByteBuffer buf, UUID uuid) { + buf.putLong(uuid.getMostSignificantBits()); + buf.putLong(uuid.getLeastSignificantBits()); + } + + /** Writes {@link IgniteUuid} to buffer. */ + static void writeIgniteUuid(ByteBuffer buf, IgniteUuid uuid) { + buf.putLong(uuid.globalId().getMostSignificantBits()); + buf.putLong(uuid.globalId().getLeastSignificantBits()); + buf.putLong(uuid.localId()); + } + + /** + * @param buf Buffer to write to. + * @param str String to write. + * @param cached {@code True} if string cached. + */ + static void writeString(ByteBuffer buf, String str, boolean cached) { + buf.put(cached ? (byte)1 : 0); + + if (cached) + buf.putInt(str.hashCode()); + else { + byte[] bytes = str.getBytes(); + + buf.putInt(bytes.length); + buf.put(bytes); + } + } + + /** @return {@code True} if string was cached and can be written as hashcode. */ + private boolean cacheIfPossible(String str) { + if (knownStrsSz >= cachedStrsThreshold) + return false; + + int hash = str.hashCode(); + + // We can cache slightly more strings then threshold value. + // Don't implement solution with synchronization here, because our primary goal is avoid any contention. + if (knownStrs.contains(hash) || !knownStrs.add(hash)) + return true; + + knownStrsSz = knownStrs.size(); + + return false; + } + + /** Worker to write to performance statistics file. */ + private class FileWriter extends GridWorker { + /** + * @param ctx Kernal context. + * @param log Logger. + */ + FileWriter(GridKernalContext ctx, IgniteLogger log) { + super(ctx.igniteInstanceName(), WRITER_THREAD_NAME, log, ctx.workersRegistry()); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + try { + long writtenToFile = 0; + + while (!isCancelled()) { + blockingSectionBegin(); + + try { + synchronized (this) { + if (writtenToFile / flushSize == writtenToBuf.get() / flushSize) + wait(); + } + } + finally { + blockingSectionEnd(); + } + + writtenToFile += flush(); + } + + flush(); + } + catch (InterruptedException e) { + try { + flush(); + } + catch (IOException ignored) { + // No-op. + } + } + catch (ClosedByInterruptException ignored) { + // No-op. + } + catch (IOException e) { + log.error("Unable to write to the performance statistics file.", e); + } + } + + /** + * Flushes to disk available bytes from the ring buffer. + * + * @return Count of written bytes. + */ + private int flush() throws IOException { + List segs = ringByteBuf.poll(); + + if (segs == null) + return 0; + + int written = 0; + + for (SegmentedRingByteBuffer.ReadSegment seg : segs) { + updateHeartbeat(); + + try { + written += fileIo.writeFully(seg.buffer()); + } + finally { + seg.release(); + } + } + + return written; + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/tracing/OperationType.java b/modules/core/src/main/java/org/apache/ignite/spi/tracing/OperationType.java new file mode 100644 index 00000000000000..ba2b08137e4fd2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/tracing/OperationType.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.tracing; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; +import org.jetbrains.annotations.Nullable; + +/** + * Performance statistics operation type. + */ +public enum OperationType { + /** Cache get. */ + CACHE_GET(0), + + /** Cache put. */ + CACHE_PUT(1), + + /** Cache remove. */ + CACHE_REMOVE(2), + + /** Cache get and put. */ + CACHE_GET_AND_PUT(3), + + /** Cache get and remove. */ + CACHE_GET_AND_REMOVE(4), + + /** Cache invoke. */ + CACHE_INVOKE(5), + + /** Cache lock. */ + CACHE_LOCK(6), + + /** Cache get all. */ + CACHE_GET_ALL(7), + + /** Cache put all. */ + CACHE_PUT_ALL(8), + + /** Cache remove all. */ + CACHE_REMOVE_ALL(9), + + /** Cache invoke all. */ + CACHE_INVOKE_ALL(10), + + /** Transaction commit. */ + TX_COMMIT(11), + + /** Transaction rollback. */ + TX_ROLLBACK(12), + + /** Query. */ + QUERY(13), + + /** Query reads. */ + QUERY_READS(14), + + /** Task. */ + TASK(15), + + /** Job. */ + JOB(16); + + /** Cache operations. */ + public static final EnumSet CACHE_OPS = EnumSet.of(CACHE_GET, CACHE_PUT, CACHE_REMOVE, + CACHE_GET_AND_PUT, CACHE_GET_AND_REMOVE, CACHE_INVOKE, CACHE_LOCK, CACHE_GET_ALL, CACHE_PUT_ALL, + CACHE_REMOVE_ALL, CACHE_INVOKE_ALL); + + /** Transaction operations. */ + public static final EnumSet TX_OPS = EnumSet.of(TX_COMMIT, TX_ROLLBACK); + + /** Value by identifier. */ + private static final Map VALS; + + /** Unique operation identifier. */ + private final byte id; + + /** Static initializer. */ + static { + Map vals = new HashMap<>(); + + for (OperationType op : values()) { + OperationType old = vals.put(op.id(), op); + + assert old == null : "Duplicate operation ID found [op=" + op + ']'; + } + + VALS = Collections.unmodifiableMap(vals); + } + + /** @param id Unique operation identifier. */ + OperationType(int id) { + this.id = (byte)id; + } + + /** @return Unique operation identifier. */ + public byte id() { + return id; + } + + /** @return Operation type of given identifier. */ + @Nullable public static OperationType of(byte id) { + return VALS.get(id); + } + + /** @return {@code True} if cache operation. */ + public static boolean cacheOperation(OperationType op) { + return CACHE_OPS.contains(op); + } + + /** @return {@code True} if transaction operation. */ + public static boolean transactionOperation(OperationType op) { + return TX_OPS.contains(op); + } + + /** @return Cache record size. */ + public static int cacheRecordSize() { + return 4 + 8 + 8; + } + + /** + * @param cachesIdsCnt Cache identifiers size. + * @return Transaction record size. + */ + public static int transactionRecordSize(int cachesIdsCnt) { + return 4 + cachesIdsCnt * 4 + 8 + 8; + } + + /** + * @param textLen Query text length. + * @param cached {@code True} if query text cached. + * @return Query record size. + */ + public static int queryRecordSize(int textLen, boolean cached) { + return 1 + (cached ? 4 : 4 + textLen) + 1 + 8 + 8 + 8 + 1; + } + + /** @return Query reads record size. */ + public static int queryReadsRecordSize() { + return 1 + 16 + 8 + 8 + 8; + } + + /** + * @param nameLen Task name length. + * @param cached {@code True} if task name cached. + * @return Task record size. + */ + public static int taskRecordSize(int nameLen, boolean cached) { + return 1 + (cached ? 4 : 4 + nameLen) + 24 + 8 + 8 + 4; + } + + /** @return Job record size. */ + public static int jobRecordSize() { + return 24 + 8 + 8 + 8 + 1; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/tracing/PerformanceStatisticsTracingSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/tracing/PerformanceStatisticsTracingSpi.java new file mode 100644 index 00000000000000..6c5340b2b899ba --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/tracing/PerformanceStatisticsTracingSpi.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.tracing; + +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.tracing.SpanType; +import org.apache.ignite.internal.util.GridIntList; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.IgniteSpiAdapter; +import org.apache.ignite.spi.IgniteSpiConsistencyChecked; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.tracing.TracingSpiType.PERFORMANCE_STATISTICS_TRACING_SPI; +import static org.apache.ignite.spi.tracing.PerformanceStatisticsTracingSpi.Span; +import static org.apache.ignite.spi.tracing.PerformanceStatisticsTracingSpi.Span.NOOP_INSTANCE; + +/** */ +@IgniteSpiMultipleInstancesSupport(value = true) +@IgniteSpiConsistencyChecked(optional = true) +public class PerformanceStatisticsTracingSpi extends IgniteSpiAdapter implements TracingSpi { + /** Performance statistics writer. {@code Null} if collecting statistics disabled. */ + @Nullable private volatile FilePerformanceStatisticsWriter writer; + + /** Synchronization mutex for start/stop collecting performance statistics operations. */ + private final Object mux = new Object(); + + /** {@inheritDoc} */ + @NotNull @Override public Span create(@NotNull String name, @Nullable Span parentSpan) { + if (SpanType.TX.spanName().equals(name)) + return new TxSpan(); + else if (SpanType.CACHE_GET.spanName().equals(name)) + return new CacheOpSpan(OperationType.CACHE_GET); + + return NOOP_INSTANCE; + } + + /** {@inheritDoc} */ + @Override public Span create(@NotNull String name, @Nullable byte[] serializedSpan) throws Exception { + // TODO disable serialization + assert false; + + return NOOP_INSTANCE; + } + + /** {@inheritDoc} */ + @Override public byte[] serialize(@NotNull Span span) { + // TODO disable serialization + assert false; + + return new byte[0]; + } + + /** {@inheritDoc} */ + @Override public byte type() { + return PERFORMANCE_STATISTICS_TRACING_SPI.index(); + } + + /** {@inheritDoc} */ + @Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException { + startWriter(); + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + stopWriter(); + } + + /** Starts performance statistics writer. */ + private void startWriter() { + try { + synchronized (mux) { + if (writer != null) + return; + + writer = new FilePerformanceStatisticsWriter(((IgniteEx)ignite).context()); + + writer.start(); + } + + log.info("Performance statistics writer started."); + } + catch (Exception e) { + log.error("Failed to start performance statistics writer.", e); + } + } + + /** Stops performance statistics writer. */ + private void stopWriter() { + synchronized (mux) { + if (writer == null) + return; + + FilePerformanceStatisticsWriter writer = this.writer; + + this.writer = null; + + writer.stop(); + } + + log.info("Performance statistics writer stopped."); + } + + /** */ + static class Span implements SpiSpecificSpan { + /** */ + static Span NOOP_INSTANCE = new Span(); + + /** {@inheritDoc} */ + @Override public SpiSpecificSpan addTag(String tagName, String tagVal) { + return this; + } + + /** {@inheritDoc} */ + @Override public SpiSpecificSpan addLog(String logDesc) { + return this; + } + + /** {@inheritDoc} */ + @Override public SpiSpecificSpan setStatus(SpanStatus spanStatus) { + return this; + } + + /** {@inheritDoc} */ + @Override public SpiSpecificSpan end() { + return this; + } + + /** {@inheritDoc} */ + @Override public boolean isEnded() { + return true; + } + } + + /** */ + private class TrackableSpan extends Span { + /** Start time. */ + final long startTime; + + /** Start time in nanoseconds to measure duration. */ + final long startTimeNanos; + + /** Duration. */ + long duration; + + /** Flag indicates that span is ended. */ + volatile boolean ended; + + /** */ + TrackableSpan() { + startTime = U.currentTimeMillis(); + startTimeNanos = System.nanoTime(); + } + + /** {@inheritDoc} */ + @Override public SpiSpecificSpan end() { + ended = true; + + duration = System.nanoTime() - startTimeNanos; + + return this; + } + + /** {@inheritDoc} */ + @Override public boolean isEnded() { + return ended; + } + } + + /** */ + private class CacheOpSpan extends TrackableSpan { + /** */ + private final OperationType opType; + + /** */ + private int cacheId; + + /** */ + CacheOpSpan(OperationType opType) { + this.opType = opType; + } + + /** {@inheritDoc} */ + @Override public SpiSpecificSpan addTag(String tagName, String tagVal) { + if ("cacheId".equals(tagName)) + cacheId = Integer.parseInt(tagVal); + + return this; + } + + /** {@inheritDoc} */ + @Override public SpiSpecificSpan end() { + super.end(); + + writer.cacheOperation(opType, cacheId, startTime, duration); + + return this; + } + } + + /** */ + private class TxSpan extends TrackableSpan { + /** */ + private boolean commited; + + /** */ + private GridIntList cacheIds; + + /** {@inheritDoc} */ + @Override public SpiSpecificSpan addTag(String tagName, String tagVal) { + if ("commited".equals(tagName)) + commited = Boolean.parseBoolean(tagVal); + + if ("cacheIds".equals(tagName)) { + String[] ids = tagVal.substring(1, tagVal.length() - 1).split(","); + + cacheIds = new GridIntList(ids.length); + + for (String id : ids) + cacheIds.add(Integer.parseInt(id)); + } + + return this; + } + + /** {@inheritDoc} */ + @Override public SpiSpecificSpan end() { + super.end(); + + // TODO can be ended by children span + if (cacheIds == null) + return this; + + writer.transaction(cacheIds, startTime, duration, commited); + + return this; + } + } +} + diff --git a/modules/core/src/main/java/org/apache/ignite/spi/tracing/Scope.java b/modules/core/src/main/java/org/apache/ignite/spi/tracing/Scope.java index 0294183b590e8b..a912b1835eaa21 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/tracing/Scope.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/tracing/Scope.java @@ -34,7 +34,10 @@ public enum Scope { TX((short)4), /** SQL query scope. */ - SQL((short)5); + SQL((short)5), + + /** */ + CACHE_API((short)6); /** Scope index. */ private final short idx; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/tracing/TracingConfigurationManager.java b/modules/core/src/main/java/org/apache/ignite/spi/tracing/TracingConfigurationManager.java index 2915b4266c3be2..a192c52589b466 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/tracing/TracingConfigurationManager.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/tracing/TracingConfigurationManager.java @@ -64,6 +64,13 @@ public interface TracingConfigurationManager { withIncludedScopes(Collections.emptySet()). build(); + /** Default communication configuration. */ + static final TracingConfigurationParameters DEFAULT_CACHE_API_CONFIGURATION = + new TracingConfigurationParameters.Builder(). + withSamplingRate(0d). + withIncludedScopes(Collections.emptySet()). + build(); + /** Default noop configuration. */ static final TracingConfigurationParameters NOOP_CONFIGURATION = new TracingConfigurationParameters.Builder(). @@ -130,6 +137,9 @@ void set(@NotNull TracingConfigurationCoordinates coordinates, case SQL: return DEFAULT_SQL_CONFIGURATION; + case CACHE_API: + return DEFAULT_CACHE_API_CONFIGURATION; + default: { return NOOP_CONFIGURATION; } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/PerfStatTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/PerfStatTest.java new file mode 100644 index 00000000000000..09516f13563bfe --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/PerfStatTest.java @@ -0,0 +1,122 @@ +package org.apache.ignite.internal.processors; + +import java.util.Collections; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.tracing.PerformanceStatisticsTracingSpi; +import org.apache.ignite.spi.tracing.Scope; +import org.apache.ignite.spi.tracing.TracingConfigurationCoordinates; +import org.apache.ignite.spi.tracing.TracingConfigurationParameters; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.junit.Test; + +import static org.apache.ignite.spi.tracing.Scope.CACHE_API; +import static org.apache.ignite.spi.tracing.TracingConfigurationParameters.SAMPLING_RATE_ALWAYS; + +public class PerfStatTest extends GridCommonAbstractTest { + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setTracingSpi(new PerformanceStatisticsTracingSpi()); + + return cfg; + } + + /** @throws Exception If failed. */ + @Test + public void test() throws Exception { + int duration = 30_000; + + IgniteEx srv = startGrids(2); + + IgniteCache cache = srv.getOrCreateCache(DEFAULT_CACHE_NAME); + + for (int i = 0; i < 10_000; i++) { + cache.put(i, i); + } + + srv.tracingConfiguration().set( + new TracingConfigurationCoordinates.Builder(Scope.TX).build(), + new TracingConfigurationParameters.Builder(). + withIncludedScopes(Collections.singleton(CACHE_API)). + withSamplingRate(SAMPLING_RATE_ALWAYS).build()); + srv.tracingConfiguration().set( + new TracingConfigurationCoordinates.Builder(CACHE_API).build(), + new TracingConfigurationParameters.Builder(). + withSamplingRate(SAMPLING_RATE_ALWAYS).build()); + + + System.out.println("MY tracing cfg="+srv.tracingConfiguration().getAll(Scope.TX)); + long start = U.currentTimeMillis(); + + AtomicLong cnt = new AtomicLong(); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(() -> { + while (U.currentTimeMillis() - start < duration) { + cache.get(ThreadLocalRandom.current().nextInt(10_000)); + + cnt.incrementAndGet(); + } + }, 4, "load"); + + long precCnt = 0; + + while (U.currentTimeMillis() - start < duration) { + System.out.println("MY load=" + (cnt.get() - precCnt) + " ops"); + + precCnt = cnt.get(); + + U.sleep(1000); + } + + fut.get(); + } + + + /** @throws Exception If failed. */ + @Test + public void test0() throws Exception { + IgniteEx srv = startGrids(2); + + srv.tracingConfiguration().set( + new TracingConfigurationCoordinates.Builder(Scope.TX).build(), + new TracingConfigurationParameters.Builder(). + withIncludedScopes(Collections.singleton(CACHE_API)). + withSamplingRate(SAMPLING_RATE_ALWAYS).build()); + srv.tracingConfiguration().set( + new TracingConfigurationCoordinates.Builder(CACHE_API).build(), + new TracingConfigurationParameters.Builder(). + withSamplingRate(SAMPLING_RATE_ALWAYS).build()); + + IgniteCache cache = srv.getOrCreateCache( + new CacheConfiguration<>(DEFAULT_CACHE_NAME).setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)); + + System.out.println("MY start TX"); + try (Transaction tx = srv.transactions().txStart()) { + U.sleep(1000); + System.out.println("MY put get START"); + cache.put(1, 1); + cache.get(1); + cache.get(1); + cache.get(1); + System.out.println("MY put get END"); + + U.sleep(1000); + System.out.println("MY commit TX"); + tx.commit(); + + U.sleep(1000); + } + System.out.println("MY close TX"); + + } +} From 88d6cdf29e2a541105f1ec4cca25ba21e119bcba Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Tue, 3 Nov 2020 13:40:30 +0300 Subject: [PATCH 2/2] WIP --- .../internal/processors/PerfStatTest.java | 122 ------------------ 1 file changed, 122 deletions(-) delete mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/PerfStatTest.java diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/PerfStatTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/PerfStatTest.java deleted file mode 100644 index 09516f13563bfe..00000000000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/PerfStatTest.java +++ /dev/null @@ -1,122 +0,0 @@ -package org.apache.ignite.internal.processors; - -import java.util.Collections; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.spi.tracing.PerformanceStatisticsTracingSpi; -import org.apache.ignite.spi.tracing.Scope; -import org.apache.ignite.spi.tracing.TracingConfigurationCoordinates; -import org.apache.ignite.spi.tracing.TracingConfigurationParameters; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.apache.ignite.transactions.Transaction; -import org.junit.Test; - -import static org.apache.ignite.spi.tracing.Scope.CACHE_API; -import static org.apache.ignite.spi.tracing.TracingConfigurationParameters.SAMPLING_RATE_ALWAYS; - -public class PerfStatTest extends GridCommonAbstractTest { - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - - cfg.setTracingSpi(new PerformanceStatisticsTracingSpi()); - - return cfg; - } - - /** @throws Exception If failed. */ - @Test - public void test() throws Exception { - int duration = 30_000; - - IgniteEx srv = startGrids(2); - - IgniteCache cache = srv.getOrCreateCache(DEFAULT_CACHE_NAME); - - for (int i = 0; i < 10_000; i++) { - cache.put(i, i); - } - - srv.tracingConfiguration().set( - new TracingConfigurationCoordinates.Builder(Scope.TX).build(), - new TracingConfigurationParameters.Builder(). - withIncludedScopes(Collections.singleton(CACHE_API)). - withSamplingRate(SAMPLING_RATE_ALWAYS).build()); - srv.tracingConfiguration().set( - new TracingConfigurationCoordinates.Builder(CACHE_API).build(), - new TracingConfigurationParameters.Builder(). - withSamplingRate(SAMPLING_RATE_ALWAYS).build()); - - - System.out.println("MY tracing cfg="+srv.tracingConfiguration().getAll(Scope.TX)); - long start = U.currentTimeMillis(); - - AtomicLong cnt = new AtomicLong(); - - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(() -> { - while (U.currentTimeMillis() - start < duration) { - cache.get(ThreadLocalRandom.current().nextInt(10_000)); - - cnt.incrementAndGet(); - } - }, 4, "load"); - - long precCnt = 0; - - while (U.currentTimeMillis() - start < duration) { - System.out.println("MY load=" + (cnt.get() - precCnt) + " ops"); - - precCnt = cnt.get(); - - U.sleep(1000); - } - - fut.get(); - } - - - /** @throws Exception If failed. */ - @Test - public void test0() throws Exception { - IgniteEx srv = startGrids(2); - - srv.tracingConfiguration().set( - new TracingConfigurationCoordinates.Builder(Scope.TX).build(), - new TracingConfigurationParameters.Builder(). - withIncludedScopes(Collections.singleton(CACHE_API)). - withSamplingRate(SAMPLING_RATE_ALWAYS).build()); - srv.tracingConfiguration().set( - new TracingConfigurationCoordinates.Builder(CACHE_API).build(), - new TracingConfigurationParameters.Builder(). - withSamplingRate(SAMPLING_RATE_ALWAYS).build()); - - IgniteCache cache = srv.getOrCreateCache( - new CacheConfiguration<>(DEFAULT_CACHE_NAME).setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)); - - System.out.println("MY start TX"); - try (Transaction tx = srv.transactions().txStart()) { - U.sleep(1000); - System.out.println("MY put get START"); - cache.put(1, 1); - cache.get(1); - cache.get(1); - cache.get(1); - System.out.println("MY put get END"); - - U.sleep(1000); - System.out.println("MY commit TX"); - tx.commit(); - - U.sleep(1000); - } - System.out.println("MY close TX"); - - } -}