Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ public class DataStorageConfiguration implements Serializable {
/** Default change data capture directory. */
public static final String DFLT_WAL_CDC_PATH = "db/wal/cdc";

/** Default change data capture directory maximum size. */
public static final long DFLT_CDC_WAL_DIRECTORY_MAX_SIZE = 0;

/** Default path (relative to working directory) of binary metadata folder */
public static final String DFLT_BINARY_METADATA_PATH = "db/binary_meta";

Expand Down Expand Up @@ -245,6 +248,10 @@ public class DataStorageConfiguration implements Serializable {
@IgniteExperimental
private String cdcWalPath = DFLT_WAL_CDC_PATH;

/** Change Data Capture directory size limit. */
@IgniteExperimental
private long cdcWalDirMaxSize = DFLT_CDC_WAL_DIRECTORY_MAX_SIZE;

/**
* Metrics enabled flag.
* @deprecated Will be removed in upcoming releases.
Expand Down Expand Up @@ -801,6 +808,32 @@ public DataStorageConfiguration setCdcWalPath(String cdcWalPath) {
return this;
}

/**
* Sets the {@link #getCdcWalPath CDC directory} maximum size in bytes.
*
* @return CDC directory maximum size in bytes.
*/
@IgniteExperimental
public long getCdcWalDirectoryMaxSize() {
return cdcWalDirMaxSize;
}

/**
* Sets the CDC directory maximum size in bytes. Zero or negative means no limit. Creation of segment CDC link
* will be skipped when the total size of CDC files in the {@link #getCdcWalPath directory} exceeds the limit.
* The CDC application will log an error due to a gap in wal files sequence. Note that cache changes will be lost.
* Default is no limit.
*
* @param cdcWalDirMaxSize CDC directory maximum size in bytes.
* @return {@code this} for chaining.
*/
@IgniteExperimental
public DataStorageConfiguration setCdcWalDirectoryMaxSize(long cdcWalDirMaxSize) {
this.cdcWalDirMaxSize = cdcWalDirMaxSize;

return this;
}

/**
* Gets flag indicating whether persistence metrics collection is enabled.
* Default value is {@link #DFLT_METRICS_ENABLED}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,10 @@ public void consumeWalSegmentsUntilStopped() {
.peek(p -> {
long nextSgmnt = segmentIndex(p);

assert lastSgmnt.get() == -1 || nextSgmnt - lastSgmnt.get() == 1;
if (lastSgmnt.get() != -1 && nextSgmnt - lastSgmnt.get() != 1) {
throw new IgniteException("Found missed segments. Some events are missed. " +
"[lastSegment=" + lastSgmnt.get() + ", nextSegment=" + nextSgmnt + ']');
}

lastSgmnt.set(nextSgmnt);
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2140,8 +2140,14 @@ public SegmentArchiveResult archiveSegment(long absIdx) throws StorageException
Files.move(dstTmpFile.toPath(), dstFile.toPath());

if (walCdcDir != null) {
if (!cdcDisabled.getOrDefault(false))
Files.createLink(walCdcDir.toPath().resolve(dstFile.getName()), dstFile.toPath());
if (!cdcDisabled.getOrDefault(false)) {
if (checkCdcWalDirectorySize(dstFile.length()))
Files.createLink(walCdcDir.toPath().resolve(dstFile.getName()), dstFile.toPath());
else {
log.error("Creation of segment CDC link skipped. Configured CDC directory " +
"maximum size exceeded.");
}
}
else {
log.warning("Creation of segment CDC link skipped. " +
"'" + CDC_DISABLED + "' distributed property is 'true'.");
Expand Down Expand Up @@ -2212,6 +2218,27 @@ public void restart() {

new IgniteThread(archiver).start();
}

/**
* @param len Length of file to check size.
* @return {@code True} if the CDC directory size check successful, otherwise {@code false}.
*/
private boolean checkCdcWalDirectorySize(long len) {
long maxDirSize = igCfg.getDataStorageConfiguration().getCdcWalDirectoryMaxSize();

if (maxDirSize <= 0)
return true;

long dirSize = Arrays.stream(walCdcDir.listFiles(WAL_SEGMENT_FILE_FILTER)).mapToLong(File::length).sum();

if (dirSize + len <= maxDirSize)
return true;

log.warning("Configured CDC WAL directory maximum size exceeded [curDirSize=" + dirSize +
", fileLength=" + len + ", cdcWalDirectoryMaxSize=" + maxDirSize + ']');

return false;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
Expand All @@ -46,15 +47,19 @@
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.RunnableX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.WithSystemProperty;
Expand All @@ -68,8 +73,11 @@
import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.DELETE;
import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.UPDATE;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_CDC_WAL_DIRECTORY_MAX_SIZE;
import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_PAGE_SIZE;
import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.junit.Assume.assumeTrue;
Expand All @@ -95,6 +103,9 @@ public class CdcSelfTest extends AbstractCdcTest {
@Parameterized.Parameter(2)
public boolean persistenceEnabled;

/** */
private long cdcWalDirMaxSize = DFLT_CDC_WAL_DIRECTORY_MAX_SIZE;

/** */
@Parameterized.Parameters(name = "consistentId={0}, wal={1}, persistence={2}")
public static Collection<?> parameters() {
Expand All @@ -121,7 +132,8 @@ public static Collection<?> parameters() {
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(persistenceEnabled)
.setCdcEnabled(true))
.setWalArchivePath(DFLT_WAL_ARCHIVE_PATH + "/" + U.maskForFileName(igniteInstanceName)));
.setWalArchivePath(DFLT_WAL_ARCHIVE_PATH + "/" + U.maskForFileName(igniteInstanceName))
.setCdcWalDirectoryMaxSize(cdcWalDirMaxSize));

cfg.setCacheConfiguration(
new CacheConfiguration<>(TX_CACHE_NAME)
Expand Down Expand Up @@ -726,6 +738,58 @@ public void testDisable() throws Exception {
assertTrue(waitForCondition(() -> 2 == walCdcDir.list().length, 2 * WAL_ARCHIVE_TIMEOUT));
}

/** */
@Test
public void testCdcDirectoryMaxSize() throws Exception {
cdcWalDirMaxSize = 10 * U.MB;
int segmentSize = (int)(cdcWalDirMaxSize / 2);

IgniteEx ign = startGrid(0);

ign.cluster().state(ACTIVE);

IgniteCache<Integer, User> cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME);
IgniteWriteAheadLogManager wal = ign.context().cache().context().wal(true);
File walCdcDir = U.field(ign.context().cache().context().wal(true), "walCdcDir");

RunnableX writeSgmnt = () -> {
int sgmnts = wal.walArchiveSegments();
int dataSize = (int)(segmentSize * 0.8);

for (int i = 0; i < dataSize / DFLT_PAGE_SIZE; i++)
wal.log(new PageSnapshot(new FullPageId(-1, -1), new byte[DFLT_PAGE_SIZE], 1));

addData(cache, 0, 1);

waitForCondition(() -> wal.walArchiveSegments() > sgmnts, 2 * WAL_ARCHIVE_TIMEOUT);
};

// Write to the WAL to exceed the configured max size.
writeSgmnt.run();
writeSgmnt.run();

// The segment link creation should be skipped.
writeSgmnt.run();

assertTrue(cdcWalDirMaxSize >= Arrays.stream(walCdcDir.listFiles()).mapToLong(File::length).sum());

UserCdcConsumer cnsmr = new UserCdcConsumer();

CdcMain cdc = createCdc(cnsmr, getConfiguration(ign.name()));

IgniteInternalFuture<?> fut = runAsync(cdc);

waitForSize(2, DEFAULT_CACHE_NAME, UPDATE, cnsmr);

assertFalse(fut.isDone());

// Write next segment after skipped.
writeSgmnt.run();

assertThrows(log, () -> fut.get(getTestTimeout()), IgniteCheckedException.class,
"Found missed segments. Some events are missed.");
}

/** */
public static void addData(IgniteCache<Integer, User> cache, int from, int to) {
for (int i = from; i < to; i++)
Expand Down