From 4cadd07924e505311f6b3b352860627998ff95f2 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Wed, 25 Jan 2023 18:42:07 +0300 Subject: [PATCH] add max size cfg --- .../DataStorageConfiguration.java | 33 ++++++++++ .../apache/ignite/internal/cdc/CdcMain.java | 5 +- .../wal/FileWriteAheadLogManager.java | 31 ++++++++- .../org/apache/ignite/cdc/CdcSelfTest.java | 66 ++++++++++++++++++- 4 files changed, 131 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java index b2b6d228588d71..8daa97a1c7b642 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java @@ -164,6 +164,9 @@ public class DataStorageConfiguration implements Serializable { /** Default change data capture directory. */ public static final String DFLT_WAL_CDC_PATH = "db/wal/cdc"; + /** Default change data capture directory maximum size. */ + public static final long DFLT_CDC_WAL_DIRECTORY_MAX_SIZE = 0; + /** Default path (relative to working directory) of binary metadata folder */ public static final String DFLT_BINARY_METADATA_PATH = "db/binary_meta"; @@ -245,6 +248,10 @@ public class DataStorageConfiguration implements Serializable { @IgniteExperimental private String cdcWalPath = DFLT_WAL_CDC_PATH; + /** Change Data Capture directory size limit. */ + @IgniteExperimental + private long cdcWalDirMaxSize = DFLT_CDC_WAL_DIRECTORY_MAX_SIZE; + /** * Metrics enabled flag. * @deprecated Will be removed in upcoming releases. @@ -801,6 +808,32 @@ public DataStorageConfiguration setCdcWalPath(String cdcWalPath) { return this; } + /** + * Sets the {@link #getCdcWalPath CDC directory} maximum size in bytes. + * + * @return CDC directory maximum size in bytes. + */ + @IgniteExperimental + public long getCdcWalDirectoryMaxSize() { + return cdcWalDirMaxSize; + } + + /** + * Sets the CDC directory maximum size in bytes. Zero or negative means no limit. Creation of segment CDC link + * will be skipped when the total size of CDC files in the {@link #getCdcWalPath directory} exceeds the limit. + * The CDC application will log an error due to a gap in wal files sequence. Note that cache changes will be lost. + * Default is no limit. + * + * @param cdcWalDirMaxSize CDC directory maximum size in bytes. + * @return {@code this} for chaining. + */ + @IgniteExperimental + public DataStorageConfiguration setCdcWalDirectoryMaxSize(long cdcWalDirMaxSize) { + this.cdcWalDirMaxSize = cdcWalDirMaxSize; + + return this; + } + /** * Gets flag indicating whether persistence metrics collection is enabled. * Default value is {@link #DFLT_METRICS_ENABLED}. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java index e16e1b409f3de4..b7d449fa6a89ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java @@ -442,7 +442,10 @@ public void consumeWalSegmentsUntilStopped() { .peek(p -> { long nextSgmnt = segmentIndex(p); - assert lastSgmnt.get() == -1 || nextSgmnt - lastSgmnt.get() == 1; + if (lastSgmnt.get() != -1 && nextSgmnt - lastSgmnt.get() != 1) { + throw new IgniteException("Found missed segments. Some events are missed. " + + "[lastSegment=" + lastSgmnt.get() + ", nextSegment=" + nextSgmnt + ']'); + } lastSgmnt.set(nextSgmnt); }) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 0dc36ad6174c6d..619a84ef0fe43b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -2140,8 +2140,14 @@ public SegmentArchiveResult archiveSegment(long absIdx) throws StorageException Files.move(dstTmpFile.toPath(), dstFile.toPath()); if (walCdcDir != null) { - if (!cdcDisabled.getOrDefault(false)) - Files.createLink(walCdcDir.toPath().resolve(dstFile.getName()), dstFile.toPath()); + if (!cdcDisabled.getOrDefault(false)) { + if (checkCdcWalDirectorySize(dstFile.length())) + Files.createLink(walCdcDir.toPath().resolve(dstFile.getName()), dstFile.toPath()); + else { + log.error("Creation of segment CDC link skipped. Configured CDC directory " + + "maximum size exceeded."); + } + } else { log.warning("Creation of segment CDC link skipped. " + "'" + CDC_DISABLED + "' distributed property is 'true'."); @@ -2212,6 +2218,27 @@ public void restart() { new IgniteThread(archiver).start(); } + + /** + * @param len Length of file to check size. + * @return {@code True} if the CDC directory size check successful, otherwise {@code false}. + */ + private boolean checkCdcWalDirectorySize(long len) { + long maxDirSize = igCfg.getDataStorageConfiguration().getCdcWalDirectoryMaxSize(); + + if (maxDirSize <= 0) + return true; + + long dirSize = Arrays.stream(walCdcDir.listFiles(WAL_SEGMENT_FILE_FILTER)).mapToLong(File::length).sum(); + + if (dirSize + len <= maxDirSize) + return true; + + log.warning("Configured CDC WAL directory maximum size exceeded [curDirSize=" + dirSize + + ", fileLength=" + len + ", cdcWalDirectoryMaxSize=" + maxDirSize + ']'); + + return false; + } } /** diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java index 369a7663aa1440..6e748c2eb93636 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -46,8 +47,11 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cdc.CdcMain; +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.record.DataRecord; +import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; @@ -55,6 +59,7 @@ import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty; import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.lang.RunnableX; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.junits.WithSystemProperty; @@ -68,8 +73,11 @@ import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.DELETE; import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.UPDATE; import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_CDC_WAL_DIRECTORY_MAX_SIZE; +import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_PAGE_SIZE; import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId; +import static org.apache.ignite.testframework.GridTestUtils.assertThrows; import static org.apache.ignite.testframework.GridTestUtils.runAsync; import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; import static org.junit.Assume.assumeTrue; @@ -95,6 +103,9 @@ public class CdcSelfTest extends AbstractCdcTest { @Parameterized.Parameter(2) public boolean persistenceEnabled; + /** */ + private long cdcWalDirMaxSize = DFLT_CDC_WAL_DIRECTORY_MAX_SIZE; + /** */ @Parameterized.Parameters(name = "consistentId={0}, wal={1}, persistence={2}") public static Collection parameters() { @@ -121,7 +132,8 @@ public static Collection parameters() { .setDefaultDataRegionConfiguration(new DataRegionConfiguration() .setPersistenceEnabled(persistenceEnabled) .setCdcEnabled(true)) - .setWalArchivePath(DFLT_WAL_ARCHIVE_PATH + "/" + U.maskForFileName(igniteInstanceName))); + .setWalArchivePath(DFLT_WAL_ARCHIVE_PATH + "/" + U.maskForFileName(igniteInstanceName)) + .setCdcWalDirectoryMaxSize(cdcWalDirMaxSize)); cfg.setCacheConfiguration( new CacheConfiguration<>(TX_CACHE_NAME) @@ -726,6 +738,58 @@ public void testDisable() throws Exception { assertTrue(waitForCondition(() -> 2 == walCdcDir.list().length, 2 * WAL_ARCHIVE_TIMEOUT)); } + /** */ + @Test + public void testCdcDirectoryMaxSize() throws Exception { + cdcWalDirMaxSize = 10 * U.MB; + int segmentSize = (int)(cdcWalDirMaxSize / 2); + + IgniteEx ign = startGrid(0); + + ign.cluster().state(ACTIVE); + + IgniteCache cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME); + IgniteWriteAheadLogManager wal = ign.context().cache().context().wal(true); + File walCdcDir = U.field(ign.context().cache().context().wal(true), "walCdcDir"); + + RunnableX writeSgmnt = () -> { + int sgmnts = wal.walArchiveSegments(); + int dataSize = (int)(segmentSize * 0.8); + + for (int i = 0; i < dataSize / DFLT_PAGE_SIZE; i++) + wal.log(new PageSnapshot(new FullPageId(-1, -1), new byte[DFLT_PAGE_SIZE], 1)); + + addData(cache, 0, 1); + + waitForCondition(() -> wal.walArchiveSegments() > sgmnts, 2 * WAL_ARCHIVE_TIMEOUT); + }; + + // Write to the WAL to exceed the configured max size. + writeSgmnt.run(); + writeSgmnt.run(); + + // The segment link creation should be skipped. + writeSgmnt.run(); + + assertTrue(cdcWalDirMaxSize >= Arrays.stream(walCdcDir.listFiles()).mapToLong(File::length).sum()); + + UserCdcConsumer cnsmr = new UserCdcConsumer(); + + CdcMain cdc = createCdc(cnsmr, getConfiguration(ign.name())); + + IgniteInternalFuture fut = runAsync(cdc); + + waitForSize(2, DEFAULT_CACHE_NAME, UPDATE, cnsmr); + + assertFalse(fut.isDone()); + + // Write next segment after skipped. + writeSgmnt.run(); + + assertThrows(log, () -> fut.get(getTestTimeout()), IgniteCheckedException.class, + "Found missed segments. Some events are missed."); + } + /** */ public static void addData(IgniteCache cache, int from, int to) { for (int i = from; i < to; i++)