From 8221c05db40392c9cc989256b8a35cd127a3e995 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Wed, 25 Jan 2023 18:45:38 +0300 Subject: [PATCH 01/13] add command --- .../persistence/change-data-capture.adoc | 18 +- .../internal/commandline/CommandList.java | 6 +- .../internal/commandline/CommonArgParser.java | 2 +- .../internal/commandline/cdc/CdcCommand.java | 126 ++++++++++ .../IgniteControlUtilityTestSuite2.java | 5 +- .../apache/ignite/util/CdcCommandTest.java | 233 ++++++++++++++++++ .../util/GridCommandHandlerAbstractTest.java | 15 ++ .../apache/ignite/util/MetricCommandTest.java | 15 -- .../ignite/util/SystemViewCommandTest.java | 15 -- .../apache/ignite/internal/cdc/CdcMain.java | 4 +- .../wal/FileWriteAheadLogManager.java | 5 + .../cdc/VisorCdcDeleteLostSegmentsTask.java | 154 ++++++++++++ .../org/apache/ignite/cdc/CdcSelfTest.java | 16 ++ ...mmandHandlerClusterByClassTest_help.output | 4 + ...ndlerClusterByClassWithSSLTest_help.output | 4 + 15 files changed, 586 insertions(+), 36 deletions(-) create mode 100644 modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java create mode 100644 modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java diff --git a/docs/_docs/persistence/change-data-capture.adoc b/docs/_docs/persistence/change-data-capture.adoc index adbfe1e47063a..763128131b750 100644 --- a/docs/_docs/persistence/change-data-capture.adoc +++ b/docs/_docs/persistence/change-data-capture.adoc @@ -142,7 +142,23 @@ IMPORTANT: `ignite-cdc.sh` implements the fail-fast approach. It just fails in c 5. Infinitely wait for the newly available segment and process it. 6. Stop the consumer in case of a failure or a received stop signal. +== Handling skipped segments + +The CDC can be disabled manually or by configured directory maximum size. In this case a hard link creation will be skipped. + +NOTE: Note that cache changes in skipped segments will be lost. + +The CDC application fails if found missing segments. + +The link:tools/control-script[Control Script] script provides the ability to delete lost segments before a last gap in segments: + +[source,shell] +---- +# Delete lost segment links on a node. +control.sh|bat --cdc delete_lost_segment_links --node-id node_id +---- + == cdc-ext Ignite extensions project has link:https://github.com/apache/ignite-extensions/tree/master/modules/cdc-ext[cdc-ext] module which provides two way to setup cross cluster replication based on CDC. -Detailed documentation can be found on link:extensions-and-integrations/change-data-capture-extensions[page]. \ No newline at end of file +Detailed documentation can be found on link:extensions-and-integrations/change-data-capture-extensions[page]. diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java index df93519469f12..2c7b78ce9bd7a 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.commandline; import org.apache.ignite.internal.commandline.cache.CacheCommands; +import org.apache.ignite.internal.commandline.cdc.CdcCommand; import org.apache.ignite.internal.commandline.consistency.ConsistencyCommand; import org.apache.ignite.internal.commandline.diagnostic.DiagnosticCommand; import org.apache.ignite.internal.commandline.encryption.EncryptionCommands; @@ -103,7 +104,10 @@ public enum CommandList { PERFORMANCE_STATISTICS("--performance-statistics", new PerformanceStatisticsCommand()), /** Command to check/repair consistency. */ - CONSISTENCY("--consistency", new ConsistencyCommand()); + CONSISTENCY("--consistency", new ConsistencyCommand()), + + /** Cdc commands. */ + CDC("--cdc", new CdcCommand()); /** Private values copy so there's no need in cloning it every time. */ private static final CommandList[] VALUES = CommandList.values(); diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommonArgParser.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommonArgParser.java index 2acdc404052ba..238a3a7804620 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommonArgParser.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommonArgParser.java @@ -57,7 +57,7 @@ public class CommonArgParser { static final String CMD_USER = "--user"; /** Option is used for auto confirmation. */ - static final String CMD_AUTO_CONFIRMATION = "--yes"; + public static final String CMD_AUTO_CONFIRMATION = "--yes"; /** */ static final String CMD_PING_INTERVAL = "--ping-interval"; diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java new file mode 100644 index 0000000000000..d1814e2703662 --- /dev/null +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.commandline.cdc; + +import java.util.UUID; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.client.GridClient; +import org.apache.ignite.internal.client.GridClientConfiguration; +import org.apache.ignite.internal.commandline.AbstractCommand; +import org.apache.ignite.internal.commandline.Command; +import org.apache.ignite.internal.commandline.CommandArgIterator; +import org.apache.ignite.internal.commandline.CommandLogger; +import org.apache.ignite.internal.visor.cdc.VisorCdcDeleteLostSegmentsTask; + +import static org.apache.ignite.internal.commandline.CommandList.CDC; +import static org.apache.ignite.internal.commandline.CommandLogger.optional; +import static org.apache.ignite.internal.commandline.CommonArgParser.CMD_AUTO_CONFIRMATION; +import static org.apache.ignite.internal.commandline.TaskExecutor.executeTaskByNameOnNode; + +/** + * CDC command. + */ +public class CdcCommand extends AbstractCommand { + /** Command to delete lost segment links. */ + public static final String DELETE_LOST_SEGMENT_LINKS = "delete_lost_segment_links"; + + /** */ + public static final String NODE_ID = "--node-id"; + + /** Node ID. */ + private UUID nodeId; + + /** {@inheritDoc} */ + @Override public Object execute(GridClientConfiguration clientCfg, IgniteLogger log) throws Exception { + try (GridClient client = Command.startClient(clientCfg)) { + executeTaskByNameOnNode( + client, + VisorCdcDeleteLostSegmentsTask.class.getName(), + null, + nodeId, + clientCfg + ); + + String res = "Lost segment CDC links successfully removed."; + + log.info(res); + + return res; + } + catch (Throwable e) { + log.error("Failed to perform operation."); + log.error(CommandLogger.errorMessage(e)); + + throw e; + } + } + + /** {@inheritDoc} */ + @Override public void parseArguments(CommandArgIterator argIter) { + nodeId = null; + + String cmd = argIter.nextArg("Expected command: " + DELETE_LOST_SEGMENT_LINKS); + + if (!DELETE_LOST_SEGMENT_LINKS.equalsIgnoreCase(cmd)) + throw new IllegalArgumentException("Unexpected command: " + cmd); + + while (argIter.hasNextSubArg()) { + String opt = argIter.nextArg("Failed to read command argument."); + + if (NODE_ID.equalsIgnoreCase(opt)) { + try { + nodeId = UUID.fromString(argIter.nextArg("Expected node ID argument.")); + } + catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Failed to parse " + NODE_ID + " command argument." + + " String representation of \"java.util.UUID\" is exepected. For example:" + + " 123e4567-e89b-42d3-a456-556642440000", e); + } + } + } + + if (nodeId == null) + throw new IllegalArgumentException("Expected node ID option: " + NODE_ID); + } + + /** {@inheritDoc} */ + @Override public String confirmationPrompt() { + return "Warning: the command will delete lost segment CDC links. Cache events from these segments will be lost."; + } + + /** {@inheritDoc} */ + @Override public String arg() { + return null; + } + + /** {@inheritDoc} */ + @Override public void printUsage(IgniteLogger logger) { + usage(logger, "Delete lost segment links on a node:", CDC, DELETE_LOST_SEGMENT_LINKS, NODE_ID, "node_id", + optional(CMD_AUTO_CONFIRMATION)); + } + + /** {@inheritDoc} */ + @Override public String name() { + return "cdc"; + } + + /** {@inheritDoc} */ + @Override public boolean experimental() { + return true; + } +} diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java index 3b7ac34cb5e47..ce8571c4ef42e 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java @@ -19,6 +19,7 @@ import org.apache.ignite.internal.commandline.indexreader.IgniteIndexReaderTest; import org.apache.ignite.util.CacheMetricsCommandTest; +import org.apache.ignite.util.CdcCommandTest; import org.apache.ignite.util.GridCommandHandlerConsistencyBinaryTest; import org.apache.ignite.util.GridCommandHandlerConsistencyCountersTest; import org.apache.ignite.util.GridCommandHandlerConsistencyRepairCorrectnessAtomicTest; @@ -64,7 +65,9 @@ PerformanceStatisticsCommandTest.class, CacheMetricsCommandTest.class, - IgniteIndexReaderTest.class + IgniteIndexReaderTest.class, + + CdcCommandTest.class }) public class IgniteControlUtilityTestSuite2 { } diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java new file mode 100644 index 0000000000000..fbaae2e7ed915 --- /dev/null +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.util; + +import java.io.File; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.Ignite; +import org.apache.ignite.cdc.AbstractCdcTest; +import org.apache.ignite.cdc.CdcConfiguration; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cdc.CdcMain; +import org.apache.ignite.internal.commandline.CommandList; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty; +import org.apache.ignite.internal.processors.metric.MetricRegistry; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Test; + +import static org.apache.ignite.cdc.AbstractCdcTest.KEYS_CNT; +import static org.apache.ignite.cdc.CdcSelfTest.addData; +import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED; +import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_INVALID_ARGUMENTS; +import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK; +import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR; +import static org.apache.ignite.internal.commandline.cdc.CdcCommand.DELETE_LOST_SEGMENT_LINKS; +import static org.apache.ignite.internal.commandline.cdc.CdcCommand.NODE_ID; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER; +import static org.apache.ignite.testframework.GridTestUtils.assertContains; + +/** + * CDC command tests. + */ +public class CdcCommandTest extends GridCommandHandlerAbstractTest { + /** */ + private IgniteEx srv0; + + /** */ + private IgniteEx srv1; + + /** */ + private DistributedChangeableProperty cdcDisabled; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setBackups(1)); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setWalForceArchiveTimeout(1000) + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setCdcEnabled(true))); + + cfg.setIncludeEventTypes(EVT_WAL_SEGMENT_ARCHIVED); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + cleanPersistenceDir(); + + srv0 = startGrid(0); + srv1 = startGrid(1); + + cdcDisabled = srv0.context().distributedConfiguration().property(FileWriteAheadLogManager.CDC_DISABLED); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** */ + @Test + public void testParseDeleteLostSegmentLinks() { + injectTestSystemOut(); + + assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS, + CommandList.CDC.text(), "unexpected_command"), + "Unexpected command: unexpected_command"); + + assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS, + CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS), + "Expected node ID option: " + NODE_ID); + + assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS, + CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID), + "Failed to parse " + NODE_ID + " command argument."); + + assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS, + CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID, "10"), + "Failed to parse " + NODE_ID + " command argument."); + } + + /** */ + @Test + public void testDeleteLostSegmentLinksApplicationNotClosed() throws Exception { + injectTestSystemOut(); + + CountDownLatch appStarted = new CountDownLatch(1); + + CdcConfiguration cfg = new CdcConfiguration(); + + cfg.setConsumer(new AbstractCdcTest.UserCdcConsumer() { + @Override public void start(MetricRegistry mreg) { + appStarted.countDown(); + } + }); + + CdcMain cdc = new CdcMain(getConfiguration(getTestIgniteInstanceName(0)), null, cfg); + + IgniteInternalFuture fut = GridTestUtils.runAsync(cdc); + + appStarted.await(getTestTimeout(), TimeUnit.MILLISECONDS); + + assertContains(log, executeCommand(EXIT_CODE_UNEXPECTED_ERROR, + CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID, srv0.localNode().id().toString()), + "Failed to delete lost segment CDC links. Unable to acquire lock to lock CDC folder."); + + assertFalse(fut.isDone()); + + fut.cancel(); + } + + /** */ + @Test + public void testDeleteLostSegmentLinks() throws Exception { + archiveSegment(); + + cdcDisabled.propagate(true); + archiveSegment(); + cdcDisabled.propagate(false); + + archiveSegment(); + + checkSegmentLinks(F.asList(0L, 2L), F.asList(2L)); + } + + /** */ + @Test + public void testDeleteLostSegmentLinksMultipleGaps() throws Exception { + archiveSegment(); + + cdcDisabled.propagate(true); + archiveSegment(); + archiveSegment(); + cdcDisabled.propagate(false); + + archiveSegment(); + + cdcDisabled.propagate(true); + archiveSegment(); + cdcDisabled.propagate(false); + + archiveSegment(); + + checkSegmentLinks(F.asList(0L, 3L, 5L), F.asList(5L)); + } + + /** */ + private void checkSegmentLinks(List expBefore, List expAfter) { + checkFiles(srv0, expBefore); + checkFiles(srv1, expBefore); + + executeCommand(EXIT_CODE_OK, + CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID, srv0.localNode().id().toString()); + + checkFiles(srv0, expAfter); + checkFiles(srv1, expBefore); // The command executed for srv0 only. + } + + /** */ + private void checkFiles(IgniteEx srv, List expLinks) { + FileWriteAheadLogManager wal0 = (FileWriteAheadLogManager)srv.context().cache().context().wal(true); + + File[] links = wal0.walCdcDirectory().listFiles(WAL_SEGMENT_FILE_FILTER); + + assertEquals(expLinks.size(), links.length); + Arrays.stream(links).map(File::toPath).map(CdcMain::segmentIndex) + .allMatch(expLinks::contains); + } + + /** */ + private void archiveSegment() throws Exception { + CountDownLatch latch = new CountDownLatch(G.allGrids().size()); + + for (Ignite srv : G.allGrids()) { + srv.events().localListen(evt -> { + latch.countDown(); + + return false; + }, EVT_WAL_SEGMENT_ARCHIVED); + } + + addData(srv1.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT); + + latch.await(getTestTimeout(), TimeUnit.MILLISECONDS); + } +} diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java index 3dc6ed6e2145c..e3555a2e31074 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java @@ -314,6 +314,21 @@ protected int execute(String... args) { return execute(new ArrayList<>(asList(args))); } + /** + * Executes command and checks its exit code. + * + * @param expExitCode Expected exit code. + * @param args Command lines arguments. + * @return Result of command execution. + */ + protected String executeCommand(int expExitCode, String... args) { + int res = execute(args); + + assertEquals(expExitCode, res); + + return testOut.toString(); + } + /** * Before command executed {@link #testOut} reset. * diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/MetricCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/MetricCommandTest.java index b173fad8ddbb4..beb7aab002bdc 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/MetricCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/MetricCommandTest.java @@ -465,19 +465,4 @@ private Map parseMetricCommandOutput(String out) { return res; } - - /** - * Executes command and checks its exit code. - * - * @param expExitCode Expected exit code. - * @param args Command lines arguments. - * @return Result of command execution. - */ - private String executeCommand(int expExitCode, String... args) { - int res = execute(args); - - assertEquals(expExitCode, res); - - return testOut.toString(); - } } diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/SystemViewCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/SystemViewCommandTest.java index 67ce487de1e8b..bf788fc6b277a 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/SystemViewCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/SystemViewCommandTest.java @@ -1303,19 +1303,4 @@ private Map>> parseSystemViewCommandOutput(String out) { return res; } - - /** - * Executes command and checks its exit code. - * - * @param expExitCode Expected exit code. - * @param args Command lines arguments. - * @return Result of command execution. - */ - private String executeCommand(int expExitCode, String... args) { - int res = execute(args); - - assertEquals(expExitCode, res); - - return testOut.toString(); - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java index b7d449fa6a89a..18e768932b0da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java @@ -438,7 +438,7 @@ public void consumeWalSegmentsUntilStopped() { // Need unseen WAL segments only. .filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile()) && !seen.contains(p)) .peek(seen::add) // Adds to seen. - .sorted(Comparator.comparingLong(this::segmentIndex)) // Sort by segment index. + .sorted(Comparator.comparingLong(CdcMain::segmentIndex)) // Sort by segment index. .peek(p -> { long nextSgmnt = segmentIndex(p); @@ -810,7 +810,7 @@ private CdcFileLockHolder tryLock(File dbStoreDirWithSubdirectory) { * @param segment WAL segment file. * @return Segment index. */ - public long segmentIndex(Path segment) { + public static long segmentIndex(Path segment) { String fn = segment.getFileName().toString(); return Long.parseLong(fn.substring(0, fn.indexOf('.'))); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 619a84ef0fe43..1c07f5e279ce2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -3301,6 +3301,11 @@ public static long totalSize(FileDescriptor... fileDescriptors) { return len; } + /** @return WAL cdc directory (including consistent ID as subfolder) */ + @Nullable public File walCdcDirectory() { + return walCdcDir; + } + /** * Check if WAL archive is unlimited. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java new file mode 100644 index 0000000000000..6b1c0eb8d871b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.cdc; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.cdc.CdcFileLockHolder; +import org.apache.ignite.internal.cdc.CdcMain; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.visor.VisorJob; +import org.apache.ignite.internal.visor.VisorOneNodeTask; +import org.apache.ignite.resources.LoggerResource; + +import static org.apache.ignite.internal.cdc.CdcConsumerState.WAL_STATE_FILE_NAME; +import static org.apache.ignite.internal.cdc.CdcMain.STATE_DIR; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER; + +/** + * Task to delete lost segment CDC links. + */ +public class VisorCdcDeleteLostSegmentsTask extends VisorOneNodeTask { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override protected VisorJob job(Void arg) { + return new VisorCdcDeleteLostSegmentsJob(arg, false); + } + + /** */ + private static class VisorCdcDeleteLostSegmentsJob extends VisorJob { + /** */ + private static final long serialVersionUID = 0L; + + /** Injected logger. */ + @LoggerResource + protected IgniteLogger log; + + /** + * Create job with specified argument. + * + * @param arg Job argument. + * @param debug Flag indicating whether debug information should be printed into node log. + */ + protected VisorCdcDeleteLostSegmentsJob(Void arg, boolean debug) { + super(arg, debug); + } + + /** {@inheritDoc} */ + @Override protected Void run(Void arg) throws IgniteException { + FileWriteAheadLogManager wal = (FileWriteAheadLogManager)ignite.context().cache().context().wal(true); + + File walCdcDir = wal.walCdcDirectory(); + + if (walCdcDir == null) + throw new IgniteException("CDC is not configured."); + + CdcFileLockHolder lock = new CdcFileLockHolder(walCdcDir.getAbsolutePath(), "Delete lost segments job", log); + + try { + lock.tryLock(1); + + try (Stream cdcFiles = Files.list(walCdcDir.toPath())) { + Set delete = new HashSet<>(); + + AtomicLong lastSgmnt = new AtomicLong(-1); + + cdcFiles + .filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile())) + .sorted(Comparator.comparingLong(CdcMain::segmentIndex) + .reversed()) // Sort by segment index. + .forEach(path -> { + long idx = CdcMain.segmentIndex(path); + + if (lastSgmnt.get() == -1 || lastSgmnt.get() - idx == 1) { + lastSgmnt.set(idx); + + return; + } + + delete.add(path.toFile()); + }); + + if (delete.isEmpty()) { + log.info("Lost segment CDC links were not found."); + + return null; + } + + log.info("Found lost segment CDC links. The following links will be deleted: " + delete); + + delete.forEach(file -> { + if (!file.delete()) { + throw new IgniteException("Failed to delete lost segment CDC link [file=" + + file.getAbsolutePath() + ']'); + } + + log.info("Segment CDC link deleted [file=" + file.getAbsolutePath() + ']'); + }); + + Path stateDir = walCdcDir.toPath().resolve(STATE_DIR); + + if (stateDir.toFile().exists()) { + File walState = stateDir.resolve(WAL_STATE_FILE_NAME).toFile(); + + if (walState.exists() && !walState.delete()) { + throw new IgniteException("Failed to delete wal state file [file=" + + walState.getAbsolutePath() + ']'); + } + } + } + catch (IOException e) { + throw new RuntimeException("Failed to delete lost segment CDC links.", e); + } + } + catch (IgniteCheckedException e) { + throw new RuntimeException("Failed to delete lost segment CDC links. " + + "Unable to acquire lock to lock CDC folder. Make sure a CDC app is shut down " + + "[dir=" + walCdcDir.getAbsolutePath() + ", reason=" + e.getMessage() + ']'); + } + finally { + U.closeQuiet(lock); + } + + return null; + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java index 6e748c2eb9363..8d7ac46db2885 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java @@ -62,6 +62,8 @@ import org.apache.ignite.internal.util.lang.RunnableX; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.visor.VisorTaskArgument; +import org.apache.ignite.internal.visor.cdc.VisorCdcDeleteLostSegmentsTask; import org.apache.ignite.testframework.junits.WithSystemProperty; import org.junit.Test; import org.junit.runner.RunWith; @@ -788,6 +790,20 @@ public void testCdcDirectoryMaxSize() throws Exception { assertThrows(log, () -> fut.get(getTestTimeout()), IgniteCheckedException.class, "Found missed segments. Some events are missed."); + + ign.compute().execute(VisorCdcDeleteLostSegmentsTask.class, new VisorTaskArgument<>(ign.localNode().id(), false)); + + cnsmr.data.clear(); + + cdc = createCdc(cnsmr, getConfiguration(ign.name())); + + IgniteInternalFuture f = runAsync(cdc); + + waitForSize(1, DEFAULT_CACHE_NAME, UPDATE, cnsmr); + + assertFalse(f.isDone()); + + f.cancel(); } /** */ diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output index 08f1081d34c8c..80ae8efae3e27 100644 --- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output +++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output @@ -343,6 +343,10 @@ If the file name isn't specified the output file name is: '.bin' Finalize partitions update counters: control.(sh|bat) --consistency finalize + [EXPERIMENTAL] + Delete lost segment links on a node: + control.(sh|bat) --cdc delete_lost_segment_links --node-id node_id [--yes] + By default commands affecting the cluster require interactive confirmation. Use --yes option to disable it. diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output index 08f1081d34c8c..80ae8efae3e27 100644 --- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output +++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output @@ -343,6 +343,10 @@ If the file name isn't specified the output file name is: '.bin' Finalize partitions update counters: control.(sh|bat) --consistency finalize + [EXPERIMENTAL] + Delete lost segment links on a node: + control.(sh|bat) --cdc delete_lost_segment_links --node-id node_id [--yes] + By default commands affecting the cluster require interactive confirmation. Use --yes option to disable it. From ccdf8238f1307a05512db6d7a8ca46e942865e10 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Wed, 25 Jan 2023 22:16:32 +0300 Subject: [PATCH 02/13] fix tests --- .../commandline/CommandHandlerParsingTest.java | 17 ++++++++++++++++- .../GridCommandHandlerClusterByClassTest.java | 8 ++++++-- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java b/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java index fa478da5fd249..c2ee56ce044b7 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java @@ -64,6 +64,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_ENABLE_EXPERIMENTAL_COMMAND; import static org.apache.ignite.internal.QueryMXBeanImpl.EXPECTED_GLOBAL_QRY_ID_FORMAT; import static org.apache.ignite.internal.commandline.CommandList.CACHE; +import static org.apache.ignite.internal.commandline.CommandList.CDC; import static org.apache.ignite.internal.commandline.CommandList.CLUSTER_CHANGE_TAG; import static org.apache.ignite.internal.commandline.CommandList.SET_STATE; import static org.apache.ignite.internal.commandline.CommandList.SHUTDOWN_POLICY; @@ -78,6 +79,8 @@ import static org.apache.ignite.internal.commandline.cache.CacheSubcommands.VALIDATE_INDEXES; import static org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_FIRST; import static org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_THROUGH; +import static org.apache.ignite.internal.commandline.cdc.CdcCommand.DELETE_LOST_SEGMENT_LINKS; +import static org.apache.ignite.internal.commandline.cdc.CdcCommand.NODE_ID; import static org.apache.ignite.testframework.GridTestUtils.assertThrows; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -412,6 +415,8 @@ else if (cmdL == CLUSTER_CHANGE_TAG) args = parseArgs(asList(cmdL.text(), "newTagValue")); else if (cmdL == WARM_UP) args = parseArgs(asList(cmdL.text(), "--stop")); + else if (cmdL == CDC) + args = parseArgs(asList(cmdL.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID, UUID.randomUUID().toString())); else args = parseArgs(asList(cmdL.text())); @@ -497,6 +502,15 @@ else if (cmdL == WARM_UP) break; } + case CDC: { + args = parseArgs(asList(cmdL.text(), DELETE_LOST_SEGMENT_LINKS, + NODE_ID, UUID.randomUUID().toString(), "--yes")); + + checkCommonParametersCorrectlyParsed(cmdL, args, true); + + break; + } + default: fail("Unknown command: " + cmd); } @@ -1232,6 +1246,7 @@ private boolean requireArgs(@Nullable CommandList cmd) { cmd == CommandList.METRIC || cmd == CommandList.DEFRAGMENTATION || cmd == CommandList.PERFORMANCE_STATISTICS || - cmd == CommandList.CONSISTENCY; + cmd == CommandList.CONSISTENCY || + cmd == CDC; } } diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java index d40f05bfcbb13..8bf760eed5137 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.EnumMap; import java.util.HashSet; import java.util.List; @@ -92,6 +91,7 @@ import static java.util.Arrays.asList; import static java.util.Arrays.stream; +import static java.util.Collections.singletonList; import static java.util.Objects.nonNull; import static java.util.stream.Collectors.toList; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ENABLE_EXPERIMENTAL_COMMAND; @@ -104,6 +104,7 @@ import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR; import static org.apache.ignite.internal.commandline.CommandHandler.UTILITY_NAME; import static org.apache.ignite.internal.commandline.CommandList.BASELINE; +import static org.apache.ignite.internal.commandline.CommandList.CDC; import static org.apache.ignite.internal.commandline.CommandList.CONSISTENCY; import static org.apache.ignite.internal.commandline.CommandList.METADATA; import static org.apache.ignite.internal.commandline.CommandList.TRACING_CONFIGURATION; @@ -115,6 +116,8 @@ import static org.apache.ignite.internal.commandline.cache.CacheDestroy.DESTROY_ALL_ARG; import static org.apache.ignite.internal.commandline.cache.CacheSubcommands.DESTROY; import static org.apache.ignite.internal.commandline.cache.CacheSubcommands.HELP; +import static org.apache.ignite.internal.commandline.cdc.CdcCommand.DELETE_LOST_SEGMENT_LINKS; +import static org.apache.ignite.internal.commandline.cdc.CdcCommand.NODE_ID; import static org.apache.ignite.internal.commandline.consistency.ConsistencyCommand.CACHE; import static org.apache.ignite.internal.commandline.consistency.ConsistencyCommand.PARTITIONS; import static org.apache.ignite.internal.commandline.consistency.ConsistencyCommand.STRATEGY; @@ -1729,11 +1732,12 @@ public void testContainsWarnInsteadExecExperimentalCmdWhenEnableExperimentalFals cmdArgs.put(WAL, asList(new String[] {"print"}, new String[] {"delete"})); cmdArgs.put(METADATA, asList(new String[] {"help"}, new String[] {"list"})); - cmdArgs.put(TRACING_CONFIGURATION, Collections.singletonList(new String[] {"get_all"})); + cmdArgs.put(TRACING_CONFIGURATION, singletonList(new String[] {"get_all"})); cmdArgs.put(CONSISTENCY, asList( new String[] {"repair", CACHE, "cache", PARTITIONS, "0", STRATEGY, "LWW"}, new String[] {"status"}, new String[] {"finalize"})); + cmdArgs.put(CDC, singletonList(new String[] {DELETE_LOST_SEGMENT_LINKS, NODE_ID, UUID.randomUUID().toString()})); String warning = String.format( "To use experimental command add --enable-experimental parameter for %s", From 76504371325803341e6a9162b1367478877a0122 Mon Sep 17 00:00:00 2001 From: Ilhom Ulmasov Date: Thu, 26 Jan 2023 09:11:25 +0300 Subject: [PATCH 03/13] IGNITE-18511 SQL Calcite: Add details to Calcite parser exception - Fixes #10479. Signed-off-by: Aleksey Plekhanov --- .../processors/query/calcite/util/Commons.java | 2 +- .../integration/TableDdlIntegrationTest.java | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java index fb51d5e9586df..442ca86f000cd 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java @@ -204,7 +204,7 @@ public static SqlNodeList parse(String qry, SqlParser.Config parserCfg) { return parse(new SourceStringReader(qry), parserCfg); } catch (SqlParseException e) { - throw new IgniteSQLException("Failed to parse query.", IgniteQueryErrorCode.PARSING, e); + throw new IgniteSQLException("Failed to parse query. " + e.getMessage(), IgniteQueryErrorCode.PARSING, e); } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java index e8932f5dd27d5..57d09f5fe1eb9 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java @@ -281,6 +281,21 @@ public void createTableIfNotExists() { sql("create table if not exists my_table (id int, val varchar)"); } + /** + * Create table using reserved word + */ + @Test + public void createTableUseReservedWord() { + assertThrows("create table table (id int primary key, val varchar)", IgniteSQLException.class, + "Failed to parse query. Encountered \"table table\""); + + sql("create table \"table\" (id int primary key, val varchar)"); + + sql("insert into \"table\" (id, val) values (0, '1')"); + + assertQuery("select * from \"table\" ").returns(0, "1").check(); + } + /** * Creates a table without a primary key and then insert a few rows. */ From 8b825687e66f42999d7f27401b7df3ff5c733c14 Mon Sep 17 00:00:00 2001 From: Aleksey Plekhanov Date: Thu, 26 Jan 2023 14:33:31 +0300 Subject: [PATCH 04/13] IGNITE-18637 Java thin client: Connections balancing - Fixes #10497. Signed-off-by: Aleksey Plekhanov --- .../internal/client/thin/ReliableChannel.java | 49 ++++- .../internal/client/thin/TcpClientCache.java | 191 +++++++++++++----- .../client/thin/TcpClientTransactions.java | 9 +- .../client/thin}/FunctionalTest.java | 95 ++++++--- .../client/thin/ReliableChannelTest.java | 36 ++-- ...nClientAbstractPartitionAwarenessTest.java | 30 ++- ...ClientPartitionAwarenessBalancingTest.java | 52 +++++ ...ClientPartitionAwarenessDiscoveryTest.java | 17 +- ...PartitionAwarenessResourceReleaseTest.java | 2 +- ...tPartitionAwarenessStableTopologyTest.java | 26 +-- ...artitionAwarenessUnstableTopologyTest.java | 22 +- .../apache/ignite/client/ClientTestSuite.java | 3 + 12 files changed, 379 insertions(+), 153 deletions(-) rename modules/core/src/test/java/org/apache/ignite/{client => internal/client/thin}/FunctionalTest.java (96%) create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessBalancingTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java index d7e0b63d550f1..6207c7fcd750d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -30,7 +31,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiFunction; @@ -117,6 +120,9 @@ final class ReliableChannel implements AutoCloseable { /** Cache addresses returned by {@code ThinClientAddressFinder}. */ private volatile String[] prevHostAddrs; + /** Open channels counter. */ + private final AtomicInteger channelsCnt = new AtomicInteger(); + /** * Constructor. */ @@ -721,8 +727,18 @@ else if (holders == null) dfltChannelIdx = reinitHolders.size() - 1; } - if (dfltChannelIdx == -1) - dfltChannelIdx = 0; + if (dfltChannelIdx == -1) { + // If holder is not specified get the random holder from the range of holders with the same port. + reinitHolders.sort(Comparator.comparingInt(h -> h.getAddress().getPort())); + + int limit = 0; + int port = reinitHolders.get(0).getAddress().getPort(); + + while (limit + 1 < reinitHolders.size() && reinitHolders.get(limit + 1).getAddress().getPort() == port) + limit++; + + dfltChannelIdx = ThreadLocalRandom.current().nextInt(limit + 1); + } curChannelsGuard.writeLock().lock(); @@ -803,7 +819,21 @@ private T applyOnDefaultChannel(Function function, curChannelsGuard.readLock().lock(); try { - hld = channels.get(curChIdx); + if (!partitionAwarenessEnabled || channelsCnt.get() <= 1 || attempt != 0) + hld = channels.get(curChIdx); + else { + // Make first attempt with the random open channel. + int idx = ThreadLocalRandom.current().nextInt(channels.size()); + int idx0 = idx; + + do { + hld = channels.get(idx); + + if (++idx == channels.size()) + idx = 0; + } + while (hld.ch == null && idx != idx0); + } } finally { curChannelsGuard.readLock().unlock(); @@ -1010,6 +1040,8 @@ private ClientChannel getOrCreateChannel(boolean ignoreThrottling) } ch = channel; + + channelsCnt.incrementAndGet(); } } @@ -1024,6 +1056,8 @@ private synchronized void closeChannel() { U.closeQuiet(ch); ch = null; + + channelsCnt.decrementAndGet(); } } @@ -1055,7 +1089,7 @@ InetSocketAddress getAddress() { } /** - * Get holders reference. For test purposes.ClientOperation + * Get holders reference. For test purposes. */ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") // For tests. List getChannelHolders() { @@ -1070,6 +1104,13 @@ Map getNodeChannels() { return nodeChannels; } + /** + * Get index of current (default) channel holder. For test purposes. + */ + int getCurrentChannelIndex() { + return curChIdx; + } + /** * Get scheduledChannelsReinit reference. For test purposes. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java index fee4d55edb68d..62dd9f1d83e62 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Function; import javax.cache.Cache; @@ -42,6 +43,7 @@ import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.client.ClientCache; import org.apache.ignite.client.ClientCacheConfiguration; +import org.apache.ignite.client.ClientConnectionException; import org.apache.ignite.client.ClientDisconnectListener; import org.apache.ignite.client.ClientException; import org.apache.ignite.client.ClientFeatureNotSupportedByServerException; @@ -228,9 +230,11 @@ public class TcpClientCache implements ClientCache { if (keys.isEmpty()) return true; - return ch.service( + TcpClientTransaction tx = transactions.tx(); + + return txAwareService(null, tx, ClientOperation.CACHE_CONTAINS_KEYS, - req -> writeKeys(keys, req), + req -> writeKeys(keys, req, tx), res -> res.in().readBoolean()); } @@ -242,9 +246,11 @@ public class TcpClientCache implements ClientCache { if (keys.isEmpty()) return IgniteClientFutureImpl.completedFuture(true); - return ch.serviceAsync( + TcpClientTransaction tx = transactions.tx(); + + return txAwareServiceAsync(null, tx, ClientOperation.CACHE_CONTAINS_KEYS, - req -> writeKeys(keys, req), + req -> writeKeys(keys, req, tx), res -> res.in().readBoolean()); } @@ -303,7 +309,12 @@ public class TcpClientCache implements ClientCache { if (keys.isEmpty()) return new HashMap<>(); - return ch.service(ClientOperation.CACHE_GET_ALL, req -> writeKeys(keys, req), this::readEntries); + TcpClientTransaction tx = transactions.tx(); + + return txAwareService(null, tx, + ClientOperation.CACHE_GET_ALL, + req -> writeKeys(keys, req, tx), + this::readEntries); } /** {@inheritDoc} */ @@ -314,7 +325,13 @@ public class TcpClientCache implements ClientCache { if (keys.isEmpty()) return IgniteClientFutureImpl.completedFuture(new HashMap<>()); - return ch.serviceAsync(ClientOperation.CACHE_GET_ALL, req -> writeKeys(keys, req), this::readEntries); + TcpClientTransaction tx = transactions.tx(); + + return txAwareServiceAsync(null, tx, + ClientOperation.CACHE_GET_ALL, + req -> writeKeys(keys, req, tx), + this::readEntries); + } /** {@inheritDoc} */ @@ -325,12 +342,28 @@ public class TcpClientCache implements ClientCache { if (map.isEmpty()) return; - ch.request(ClientOperation.CACHE_PUT_ALL, req -> writeEntries(map, req)); + TcpClientTransaction tx = transactions.tx(); + + txAwareService(null, tx, + ClientOperation.CACHE_PUT_ALL, + req -> writeEntries(map, req, tx), + null); } /** {@inheritDoc} */ @Override public IgniteClientFuture putAllAsync(Map map) throws ClientException { - return ch.requestAsync(ClientOperation.CACHE_PUT_ALL, req -> writeEntries(map, req)); + if (map == null) + throw new NullPointerException("map"); + + if (map.isEmpty()) + return IgniteClientFutureImpl.completedFuture(null); + + TcpClientTransaction tx = transactions.tx(); + + return txAwareServiceAsync(null, tx, + ClientOperation.CACHE_PUT_ALL, + req -> writeEntries(map, req, tx), + null); } /** {@inheritDoc} */ @@ -475,11 +508,14 @@ public class TcpClientCache implements ClientCache { if (keys.isEmpty()) return; - ch.request( + TcpClientTransaction tx = transactions.tx(); + + txAwareService(null, tx, ClientOperation.CACHE_REMOVE_KEYS, req -> { - writeKeys(keys, req); - } + writeKeys(keys, req, tx); + }, + null ); } @@ -491,11 +527,14 @@ public class TcpClientCache implements ClientCache { if (keys.isEmpty()) return IgniteClientFutureImpl.completedFuture(null); - return ch.requestAsync( - ClientOperation.CACHE_REMOVE_KEYS, - req -> { - writeKeys(keys, req); - } + TcpClientTransaction tx = transactions.tx(); + + return txAwareServiceAsync(null, tx, + ClientOperation.CACHE_REMOVE_KEYS, + req -> { + writeKeys(keys, req, tx); + }, + null ); } @@ -707,9 +746,12 @@ public class TcpClientCache implements ClientCache { if (keys.isEmpty()) return; - ch.request( + TcpClientTransaction tx = transactions.tx(); + + txAwareService(null, tx, ClientOperation.CACHE_CLEAR_KEYS, - req -> writeKeys(keys, req) + req -> writeKeys(keys, req, tx), + null ); } @@ -721,9 +763,12 @@ public class TcpClientCache implements ClientCache { if (keys.isEmpty()) return IgniteClientFutureImpl.completedFuture(null); - return ch.requestAsync( + TcpClientTransaction tx = transactions.tx(); + + return txAwareServiceAsync(null, tx, ClientOperation.CACHE_CLEAR_KEYS, - req -> writeKeys(keys, req) + req -> writeKeys(keys, req, tx), + null ); } @@ -1062,6 +1107,68 @@ private QueryCursor> sqlQuery(SqlQuery qry) { )); } + /** + * Execute operation on channel most suitable for transactional context. + */ + private T txAwareService( + @Nullable K affKey, + TcpClientTransaction tx, + ClientOperation op, + Consumer payloadWriter, + Function payloadReader + ) { + // Transactional operation cannot be executed on affinity node, it should be executed on node started + // the transaction. + if (tx != null) { + try { + return tx.clientChannel().service(op, payloadWriter, payloadReader); + } + catch (ClientConnectionException e) { + throw new ClientException("Transaction context has been lost due to connection errors. " + + "Cache operations are prohibited until current transaction closed.", e); + } + } + else if (affKey != null) + return ch.affinityService(cacheId, affKey, op, payloadWriter, payloadReader); + else + return ch.service(op, payloadWriter, payloadReader); + } + + /** + * Execute operation on channel most suitable for transactional context. + */ + private IgniteClientFuture txAwareServiceAsync( + @Nullable K affKey, + TcpClientTransaction tx, + ClientOperation op, + Consumer payloadWriter, + Function payloadReader + ) { + // Transactional operation cannot be executed on affinity node, it should be executed on node started + // the transaction. + if (tx != null) { + CompletableFuture fut = new CompletableFuture<>(); + + tx.clientChannel().serviceAsync(op, payloadWriter, payloadReader).whenComplete((res, err) -> { + if (err instanceof ClientConnectionException) { + fut.completeExceptionally( + new ClientException("Transaction context has been lost due to connection errors. " + + "Cache operations are prohibited until current transaction closed.", err)); + } + else if (err != null) + fut.completeExceptionally(err); + else + fut.complete(res); + }); + + return new IgniteClientFutureImpl<>(fut); + } + else if (affKey != null) + return ch.affinityServiceAsync(cacheId, affKey, op, payloadWriter, payloadReader); + else + return ch.serviceAsync(op, payloadWriter, payloadReader); + } + /** * Execute cache operation with a single key. */ @@ -1071,18 +1178,17 @@ private T cacheSingleKeyOperation( Consumer additionalPayloadWriter, Function payloadReader ) throws ClientException { + TcpClientTransaction tx = transactions.tx(); + Consumer payloadWriter = req -> { - writeCacheInfo(req); + writeCacheInfo(req, tx); writeObject(req, key); if (additionalPayloadWriter != null) additionalPayloadWriter.accept(req); }; - // Transactional operation cannot be executed on affinity node, it should be executed on node started - // the transaction. - return transactions.tx() == null ? ch.affinityService(cacheId, key, op, payloadWriter, payloadReader) : - ch.service(op, payloadWriter, payloadReader); + return txAwareService(key, tx, op, payloadWriter, payloadReader); } /** @@ -1094,31 +1200,32 @@ private IgniteClientFuture cacheSingleKeyOperationAsync( Consumer additionalPayloadWriter, Function payloadReader ) throws ClientException { + TcpClientTransaction tx = transactions.tx(); + Consumer payloadWriter = req -> { - writeCacheInfo(req); + writeCacheInfo(req, tx); writeObject(req, key); if (additionalPayloadWriter != null) additionalPayloadWriter.accept(req); }; - // Transactional operation cannot be executed on affinity node, it should be executed on node started - // the transaction. - return transactions.tx() == null - ? ch.affinityServiceAsync(cacheId, key, op, payloadWriter, payloadReader) - : ch.serviceAsync(op, payloadWriter, payloadReader); + return txAwareServiceAsync(key, tx, op, payloadWriter, payloadReader); } - /** Write cache ID and flags. */ + /** Write cache ID and flags for non-transactional operations. */ private void writeCacheInfo(PayloadOutputChannel payloadCh) { + writeCacheInfo(payloadCh, null); + } + + /** Write cache ID and flags. */ + private void writeCacheInfo(PayloadOutputChannel payloadCh, TcpClientTransaction tx) { BinaryOutputStream out = payloadCh.out(); out.writeInt(cacheId); byte flags = keepBinary ? KEEP_BINARY_FLAG_MASK : 0; - TcpClientTransaction tx = transactions.tx(); - if (expiryPlc != null) { ProtocolContext protocolCtx = payloadCh.clientChannel().protocolCtx(); @@ -1130,14 +1237,8 @@ private void writeCacheInfo(PayloadOutputChannel payloadCh) { flags |= WITH_EXPIRY_POLICY_FLAG_MASK; } - if (tx != null) { - if (tx.clientChannel() != payloadCh.clientChannel()) { - throw new ClientException("Transaction context has been lost due to connection errors. " + - "Cache operations are prohibited until current transaction closed."); - } - + if (tx != null) flags |= TRANSACTIONAL_FLAG_MASK; - } out.writeByte(flags); @@ -1177,8 +1278,8 @@ private void writeObject(PayloadOutputChannel payloadCh, Object obj) { } /** */ - private void writeKeys(Set keys, PayloadOutputChannel req) { - writeCacheInfo(req); + private void writeKeys(Set keys, PayloadOutputChannel req, TcpClientTransaction tx) { + writeCacheInfo(req, tx); ClientUtils.collection(keys, req.out(), serDes::writeObject); } @@ -1196,8 +1297,8 @@ private Map readEntries(PayloadInputChannel res) { } /** */ - private void writeEntries(Map map, PayloadOutputChannel req) { - writeCacheInfo(req); + private void writeEntries(Map map, PayloadOutputChannel req, TcpClientTransaction tx) { + writeCacheInfo(req, tx); ClientUtils.collection( map.entrySet(), req.out(), diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java index 3552968c37050..1b72dab3674af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.client.ClientConnectionException; import org.apache.ignite.client.ClientException; import org.apache.ignite.client.ClientTransaction; import org.apache.ignite.client.ClientTransactions; @@ -238,15 +239,15 @@ private TcpClientTransaction(int id, ClientChannel clientCh) { */ private void endTx(boolean committed) { try { - ch.service(ClientOperation.TX_END, + clientCh.service(ClientOperation.TX_END, req -> { - if (clientCh != req.clientChannel()) - throw new ClientException("Transaction context has been lost due to connection errors"); - req.out().writeInt(txId); req.out().writeBoolean(committed); }, null); } + catch (ClientConnectionException e) { + throw new ClientException("Transaction context has been lost due to connection errors", e); + } finally { txMap.remove(txUid); diff --git a/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/FunctionalTest.java similarity index 96% rename from modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java rename to modules/core/src/test/java/org/apache/ignite/internal/client/thin/FunctionalTest.java index 792a9980e30e4..c278fc5f3d853 100644 --- a/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/FunctionalTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.client; +package org.apache.ignite.internal.client.thin; import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; @@ -57,6 +57,16 @@ import org.apache.ignite.cache.PartitionLossPolicy; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.client.ClientCache; +import org.apache.ignite.client.ClientCacheConfiguration; +import org.apache.ignite.client.ClientConnectionException; +import org.apache.ignite.client.ClientException; +import org.apache.ignite.client.ClientTransaction; +import org.apache.ignite.client.Comparers; +import org.apache.ignite.client.Config; +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.client.LocalIgniteCluster; +import org.apache.ignite.client.Person; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.ClientConfiguration; import org.apache.ignite.configuration.ClientConnectorConfiguration; @@ -66,15 +76,12 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.AbstractBinaryArraysTest; -import org.apache.ignite.internal.client.thin.ClientServerError; import org.apache.ignite.internal.processors.cache.CacheEnumOperationsAbstractTest.TestEnum; -import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor; import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy; import org.apache.ignite.internal.processors.platform.client.ClientStatus; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.mxbean.ClientProcessorMXBean; import org.apache.ignite.spi.systemview.view.SystemView; import org.apache.ignite.spi.systemview.view.TransactionView; import org.apache.ignite.testframework.GridTestUtils; @@ -828,8 +835,11 @@ public void testTxResumeAfterTxTimeout() throws Exception { */ @Test public void testTransactions() throws Exception { - try (Ignite ignite = Ignition.start(Config.getServerConfiguration()); - IgniteClient client = Ignition.startClient(getClientConfiguration()) + int clusterSize = 3; + + try (LocalIgniteCluster cluster = LocalIgniteCluster.start(clusterSize); + IgniteClient client = Ignition.startClient(getClientConfiguration() + .setAddresses(cluster.clientAddresses().toArray(new String[clusterSize]))) ) { ClientCache cache = client.createCache(new ClientCacheConfiguration() .setName("cache") @@ -937,13 +947,10 @@ public void testTransactions() throws Exception { cache.put(1, "value5"); // Test failover. - ClientProcessorMXBean mxBean = getMxBean(ignite.name(), "Clients", - ClientListenerProcessor.class, ClientProcessorMXBean.class); - try (ClientTransaction tx = client.transactions().txStart()) { cache.put(1, "value6"); - mxBean.dropAllConnections(); + ((TcpClientTransactions.TcpClientTransaction)tx).clientChannel().close(); try { cache.put(1, "value7"); @@ -1157,9 +1164,35 @@ public void testTransactions() throws Exception { assertEquals("value23", cache.get(0)); } + // Test that new transaction can be started after commit of the previous one without closing. + ClientTransaction tx = client.transactions().txStart(); + tx.commit(); + + tx = client.transactions().txStart(); + tx.rollback(); + + // Test that new transaction can be started after rollback of the previous one without closing. + tx = client.transactions().txStart(); + tx.commit(); + + // Test that implicit transaction started after commit of previous one without closing. + cache.put(0, "value24"); + + GridTestUtils.runAsync(() -> assertEquals("value24", cache.get(0))).get(); + } + } + + /** + * Test transactions. + */ + @Test + public void testTransactionsLimit() throws Exception { + try (IgniteEx ignite = (IgniteEx)Ignition.start(Config.getServerConfiguration()); + IgniteClient client = Ignition.startClient(getClientConfiguration()) + ) { // Test active transactions limit. - int txLimit = ignite.configuration().getClientConnectorConfiguration().getThinClientConfiguration() - .getMaxActiveTxPerConnection(); + int txLimit = ignite.configuration().getClientConnectorConfiguration() + .getThinClientConfiguration().getMaxActiveTxPerConnection(); List txs = new ArrayList<>(txLimit); @@ -1181,22 +1214,6 @@ public void testTransactions() throws Exception { for (ClientTransaction tx : txs) tx.close(); - - // Test that new transaction can be started after commit of the previous one without closing. - ClientTransaction tx = client.transactions().txStart(); - tx.commit(); - - tx = client.transactions().txStart(); - tx.rollback(); - - // Test that new transaction can be started after rollback of the previous one without closing. - tx = client.transactions().txStart(); - tx.commit(); - - // Test that implicit transaction started after commit of previous one without closing. - cache.put(0, "value24"); - - GridTestUtils.runAsync(() -> assertEquals("value24", cache.get(0))).get(); } } @@ -1205,8 +1222,11 @@ public void testTransactions() throws Exception { */ @Test public void testTransactionsAsync() throws Exception { - try (Ignite ignored = Ignition.start(Config.getServerConfiguration()); - IgniteClient client = Ignition.startClient(getClientConfiguration()) + int clusterSize = 3; + + try (LocalIgniteCluster cluster = LocalIgniteCluster.start(clusterSize); + IgniteClient client = Ignition.startClient(getClientConfiguration() + .setAddresses(cluster.clientAddresses().toArray(new String[clusterSize]))) ) { ClientCache cache = client.createCache(new ClientCacheConfiguration() .setName("cache") @@ -1240,6 +1260,21 @@ public void testTransactionsAsync() throws Exception { } assertEquals("value2", cache.get(1)); + + // Test multi-key operations. + try (ClientTransaction tx = client.transactions().txStart()) { + cache.putAllAsync(F.asMap(1, "value3", 2, "value3")).get(); + + assertEquals(F.asMap(1, "value3", 2, "value3"), + cache.getAllAsync(new HashSet<>(F.asList(1, 2))).get()); + + cache.removeAllAsync(new HashSet<>(F.asList(1, 2))).get(); + + assertFalse(cache.containsKeysAsync(new HashSet<>(F.asList(1, 2))).get()); + } + + assertEquals("value2", cache.get(1)); + assertFalse(cache.containsKey(2)); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java index 713979a5303b0..77038a09a7fd9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java @@ -20,9 +20,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Queue; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; @@ -77,7 +79,7 @@ public void testDuplicatedAddressesAreValid() { /** * Checks that in case if address specified without port, the default port will be processed first - * */ + */ @Test public void testAddressWithoutPort() { ClientConfiguration ccfg = new ClientConfiguration().setAddresses("127.0.0.1"); @@ -88,28 +90,38 @@ public void testAddressWithoutPort() { assertEquals(ClientConnectorConfiguration.DFLT_PORT_RANGE + 1, rc.getChannelHolders().size()); - assertEquals(ClientConnectorConfiguration.DFLT_PORT, rc.getChannelHolders().iterator().next().getAddress().getPort()); + assertEquals(ClientConnectorConfiguration.DFLT_PORT, F.first(rc.getChannelHolders()).getAddress().getPort()); + + assertEquals(0, rc.getCurrentChannelIndex()); } /** - * Checks that ReliableChannel provides channels in the same order as in ClientConfiguration. - * */ + * Checks that ReliableChannel chooses random address as default from the set of addresses with the same (minimal) port. + */ @Test - public void testAddressesOrder() { - String[] addrs = new String[] {"127.0.0.1:10803", "127.0.0.1:10802", "127.0.0.1:10801", "127.0.0.1:10800"}; + public void testDefaultChannelBalancing() { + assertEquals(new HashSet<>(F.asList("127.0.0.2:10800", "127.0.0.3:10800", "127.0.0.4:10800")), + usedDefaultChannels("127.0.0.1:10801..10809", "127.0.0.2", "127.0.0.3:10800", "127.0.0.4:10800..10809")); + + assertEquals(new HashSet<>(F.asList("127.0.0.1:10800", "127.0.0.2:10800", "127.0.0.3:10800", "127.0.0.4:10800")), + usedDefaultChannels("127.0.0.1:10800", "127.0.0.2:10800", "127.0.0.3:10800", "127.0.0.4:10800")); + } + /** */ + private Set usedDefaultChannels(String... addrs) { ClientConfiguration ccfg = new ClientConfiguration().setAddresses(addrs); - ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null); + Set usedChannels = new HashSet<>(); - rc.channelsInit(); + for (int i = 0; i < 100; i++) { + ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null); - List holders = rc.getChannelHolders(); + rc.channelsInit(); - assertEquals(addrs.length, holders.size()); + usedChannels.add(rc.getChannelHolders().get(rc.getCurrentChannelIndex()).getAddress().toString()); + } - for (int i = 0; i < addrs.length; i++) - assertEquals(addrs[i], holders.get(i).getAddress().toString()); + return usedChannels; } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java index ba2624aeef3e8..9ffaa31a84a0c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.configuration.ClientConnectorConfiguration.DFLT_PORT; @@ -83,9 +84,6 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo /** Operations queue. */ protected final Queue> opsQueue = new ConcurrentLinkedQueue<>(); - /** Default channel. */ - protected TestTcpClientChannel dfltCh; - /** Client instance. */ protected IgniteClient client; @@ -143,7 +141,7 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo /** * Checks that operation goes through specified channel. */ - protected void assertOpOnChannel(TestTcpClientChannel expCh, ClientOperation expOp) { + protected void assertOpOnChannel(@Nullable TestTcpClientChannel expCh, ClientOperation expOp) { T2 nextChOp = opsQueue.poll(); assertNotNull("Unexpected (null) next operation [expCh=" + expCh + ", expOp=" + expOp + ']', nextChOp); @@ -151,8 +149,10 @@ protected void assertOpOnChannel(TestTcpClientChannel expCh, ClientOperation exp assertEquals("Unexpected operation on channel [expCh=" + expCh + ", expOp=" + expOp + ", nextOpCh=" + nextChOp + ']', expOp, nextChOp.get2()); - assertEquals("Unexpected channel for operation [expCh=" + expCh + ", expOp=" + expOp + - ", nextOpCh=" + nextChOp + ']', expCh, nextChOp.get1()); + if (expCh != null) { + assertEquals("Unexpected channel for operation [expCh=" + expCh + ", expOp=" + expOp + + ", nextOpCh=" + nextChOp + ']', expCh, nextChOp.get1()); + } } /** @@ -175,7 +175,7 @@ protected TestTcpClientChannel nodeChannel(UUID nodeId) { return channels[i]; } - return dfltCh; + return null; } /** @@ -212,25 +212,19 @@ protected void initClient(ClientConfiguration clientCfg, int... chIdxs) throws I awaitChannelsInit(chIdxs); - initDefaultChannel(); + opsQueue.clear(); } /** - * + * Trigger client to detect topology change. */ - protected void initDefaultChannel() { + protected void detectTopologyChange() { opsQueue.clear(); - // Send non-affinity request to determine default channel. + // Send non-affinity request to detect topology change. client.getOrCreateCache(REPL_CACHE_NAME); - T2 nextChOp = opsQueue.poll(); - - assertNotNull(nextChOp); - - assertEquals(nextChOp.get2(), ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME); - - dfltCh = nextChOp.get1(); + assertOpOnChannel(null, ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessBalancingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessBalancingTest.java new file mode 100644 index 0000000000000..f89dffe8c3f97 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessBalancingTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.thin; + +import java.util.BitSet; +import java.util.List; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; +import org.junit.Test; + +/** + * Test requests to connections distribution by thin client. + */ +public class ThinClientPartitionAwarenessBalancingTest extends ThinClientAbstractPartitionAwarenessTest { + /** */ + @Test + public void testConnectionDistribution() throws Exception { + startGrids(3); + + initClient(getClientConfiguration(0, 1, 2, 3), 0, 1, 2); + + BitSet usedConnections = new BitSet(); + + for (int i = 0; i < 100; i++) + client.cacheNames(); // Non-affinity requests should be randomly distributed among connections. + + List channelList = F.asList(channels); + + while (!opsQueue.isEmpty()) { + T2 op = opsQueue.poll(); + + usedConnections.set(channelList.indexOf(op.get1())); + } + + assertEquals(BitSet.valueOf(new byte[] {7}), usedConnections); // 7 = set of {0, 1, 2} + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java index d7611842ab89b..dff4d8ea11876 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java @@ -123,12 +123,15 @@ private void testPartitionAwareness(int... chIdxs) { clientCache.put(i, i); if (i == 0) - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS); + assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS); assertOpOnChannel(opCh, ClientOperation.CACHE_PUT); - assertTrue(channelHits.containsKey(opCh)); - channelHits.compute(opCh, (c, old) -> true); + if (opCh != null) { + assertTrue(channelHits.containsKey(opCh)); + + channelHits.compute(opCh, (c, old) -> true); + } } assertFalse(channelHits.containsValue(false)); @@ -156,12 +159,4 @@ private ClientConfiguration getClientConfigurationWithDiscovery(int... excludeId .setAddressesFinder(addrFinder) .setPartitionAwarenessEnabled(true); } - - /** - * Trigger client to detect topology change. - */ - private void detectTopologyChange() { - // Send non-affinity request to detect topology change. - initDefaultChannel(); - } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java index 279d31865cf29..6de86591c824b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java @@ -88,7 +88,7 @@ public void testResourcesReleasedAfterCacheDestroyed() throws Exception { clientCache.put(0, 0); TestTcpClientChannel opCh = affinityChannel(0, gridCache); - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS); + assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS); assertOpOnChannel(opCh, ClientOperation.CACHE_PUT); for (int i = 1; i < KEY_CNT; i++) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java index 5556da02ee50d..b94062b098829 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java @@ -148,8 +148,8 @@ public void testPartitionedCacheUnknownNode() throws IgniteCheckedException { clientCache.put(keyForUnknownNode, 0); - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS); - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT); + assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS); + assertOpOnChannel(null, ClientOperation.CACHE_PUT); } /** @@ -191,18 +191,14 @@ public void testScanQuery() throws IgniteCheckedException { for (int i = 0; i < GRIDS_CNT; i++) { int part = grid(i).affinity(PART_CACHE_NAME).primaryPartitions(grid(i).localNode())[0]; - // Client doesn't have connection with grid(0). - TestTcpClientChannel ch = i == 0 ? dfltCh : nodeChannel(grid(i).localNode().id()); + TestTcpClientChannel ch = nodeChannel(grid(i).localNode().id()); // Test scan query with specified partition. clientCache.query(new ScanQuery<>().setPartition(part)).getAll(); + // Client doesn't have connection with grid(0), ch will be null for this grid + // and operation on any channel is acceptable. assertOpOnChannel(ch, ClientOperation.QUERY_SCAN); - - // Test scan query without specified partition. - clientCache.query(new ScanQuery<>()).getAll(); - - assertOpOnChannel(dfltCh, ClientOperation.QUERY_SCAN); } } @@ -345,17 +341,17 @@ private void testNotApplicableCache(String cacheName) { // After first response we should send partitions request on default channel together with next request. cache.put(0, 0); - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS); - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT); + assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS); + assertOpOnChannel(null, ClientOperation.CACHE_PUT); for (int i = 1; i < KEY_CNT; i++) { cache.put(i, i); - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT); + assertOpOnChannel(null, ClientOperation.CACHE_PUT); cache.get(i); - assertOpOnChannel(dfltCh, ClientOperation.CACHE_GET); + assertOpOnChannel(null, ClientOperation.CACHE_GET); } } @@ -371,9 +367,7 @@ private void testApplicableCache(String cacheName, Function key TestTcpClientChannel opCh = affinityChannel(keyFactory.apply(0), igniteCache); - // Default channel is the first who detects topology change, so next partition request will go through - // the default channel. - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS); + assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS); assertOpOnChannel(opCh, ClientOperation.CACHE_PUT); for (int i = 1; i < KEY_CNT; i++) { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java index e167ab1652c03..dfe93a97c6906 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java @@ -60,7 +60,7 @@ public void testPartitionAwarenessOnNodeJoin() throws Exception { awaitChannelsInit(3); - assertOpOnChannel(dfltCh, ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME); + assertOpOnChannel(null, ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME); Integer key = primaryKey(grid(3).cache(PART_CACHE_NAME)); @@ -68,9 +68,7 @@ public void testPartitionAwarenessOnNodeJoin() throws Exception { cache.put(key, 0); - // Cache partitions are requested from default channel, since it's first (and currently the only) channel - // which detects new topology. - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS); + assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS); assertOpOnChannel(channels[3], ClientOperation.CACHE_PUT); @@ -98,8 +96,8 @@ public void testPartitionAwarenessOnNodeLeft() throws Exception { awaitPartitionMapExchange(); - // Next request will also detect topology change. - initDefaultChannel(); + // Detect topology change. + detectTopologyChange(); // Test partition awareness after node join. testPartitionAwareness(true); @@ -119,8 +117,8 @@ public void testConnectionLoss() throws Exception { // Test partition awareness before connection to node lost. testPartitionAwareness(true); - // Choose node to disconnect (not default node). - int disconnectNodeIdx = dfltCh == channels[0] ? 1 : 0; + // Choose node to disconnect. + int disconnectNodeIdx = 0; // Drop all thin connections from the node. getMxBean(grid(disconnectNodeIdx).name(), "Clients", @@ -137,8 +135,8 @@ public void testConnectionLoss() throws Exception { cache.put(key, 0); - // Request goes to default channel, since affinity node is disconnected. - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT); + // Request goes to the connected channel, since affinity node is disconnected. + assertOpOnChannel(channels[1], ClientOperation.CACHE_PUT); cache.put(key, 0); @@ -175,7 +173,7 @@ public void testPartitionAwarenessOnClusterRestart() throws Exception { // Send any request to failover. client.cache(REPL_CACHE_NAME).put(0, 0); - initDefaultChannel(); + detectTopologyChange(); awaitChannelsInit(0, 1); @@ -197,7 +195,7 @@ private void testPartitionAwareness(boolean partReq) { clientCache.put(i, i); if (partReq) { - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS); + assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS); partReq = false; } diff --git a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java index 4a97cd5d1ee22..83b6f0fa86d11 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java @@ -24,12 +24,14 @@ import org.apache.ignite.internal.client.thin.ClusterGroupTest; import org.apache.ignite.internal.client.thin.ComputeTaskTest; import org.apache.ignite.internal.client.thin.DataReplicationOperationsTest; +import org.apache.ignite.internal.client.thin.FunctionalTest; import org.apache.ignite.internal.client.thin.IgniteSetTest; import org.apache.ignite.internal.client.thin.MetadataRegistrationTest; import org.apache.ignite.internal.client.thin.OptimizedMarshallerClassesCachedTest; import org.apache.ignite.internal.client.thin.ReliableChannelTest; import org.apache.ignite.internal.client.thin.ServicesBinaryArraysTests; import org.apache.ignite.internal.client.thin.ServicesTest; +import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessBalancingTest; import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessDiscoveryTest; import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessResourceReleaseTest; import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessStableTopologyTest; @@ -69,6 +71,7 @@ ThinClientPartitionAwarenessUnstableTopologyTest.class, ThinClientPartitionAwarenessResourceReleaseTest.class, ThinClientPartitionAwarenessDiscoveryTest.class, + ThinClientPartitionAwarenessBalancingTest.class, ReliableChannelTest.class, CacheAsyncTest.class, TimeoutTest.class, From f055f9a5b9e4d52eba911429486a90de6e36c616 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Fri, 27 Jan 2023 17:37:07 +0300 Subject: [PATCH 05/13] mark grid internal --- .../internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java index 6b1c0eb8d871b..1199ca4ebe8db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.cdc.CdcFileLockHolder; import org.apache.ignite.internal.cdc.CdcMain; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.visor.VisorJob; import org.apache.ignite.internal.visor.VisorOneNodeTask; @@ -44,6 +45,7 @@ /** * Task to delete lost segment CDC links. */ +@GridInternal public class VisorCdcDeleteLostSegmentsTask extends VisorOneNodeTask { /** */ private static final long serialVersionUID = 0L; From ef68b6fff355e78060d6ca5b53caa230ee246f60 Mon Sep 17 00:00:00 2001 From: Nikita Amelchev Date: Mon, 30 Jan 2023 14:19:48 +0300 Subject: [PATCH 06/13] IGNITE-14450 Add Maximum CDC directory size configuration parameter (#10456) --- .../DataStorageConfiguration.java | 33 ++++++++++ .../apache/ignite/internal/cdc/CdcMain.java | 5 +- .../wal/FileWriteAheadLogManager.java | 31 ++++++++- .../org/apache/ignite/cdc/CdcSelfTest.java | 66 ++++++++++++++++++- 4 files changed, 131 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java index b2b6d228588d7..8daa97a1c7b64 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java @@ -164,6 +164,9 @@ public class DataStorageConfiguration implements Serializable { /** Default change data capture directory. */ public static final String DFLT_WAL_CDC_PATH = "db/wal/cdc"; + /** Default change data capture directory maximum size. */ + public static final long DFLT_CDC_WAL_DIRECTORY_MAX_SIZE = 0; + /** Default path (relative to working directory) of binary metadata folder */ public static final String DFLT_BINARY_METADATA_PATH = "db/binary_meta"; @@ -245,6 +248,10 @@ public class DataStorageConfiguration implements Serializable { @IgniteExperimental private String cdcWalPath = DFLT_WAL_CDC_PATH; + /** Change Data Capture directory size limit. */ + @IgniteExperimental + private long cdcWalDirMaxSize = DFLT_CDC_WAL_DIRECTORY_MAX_SIZE; + /** * Metrics enabled flag. * @deprecated Will be removed in upcoming releases. @@ -801,6 +808,32 @@ public DataStorageConfiguration setCdcWalPath(String cdcWalPath) { return this; } + /** + * Sets the {@link #getCdcWalPath CDC directory} maximum size in bytes. + * + * @return CDC directory maximum size in bytes. + */ + @IgniteExperimental + public long getCdcWalDirectoryMaxSize() { + return cdcWalDirMaxSize; + } + + /** + * Sets the CDC directory maximum size in bytes. Zero or negative means no limit. Creation of segment CDC link + * will be skipped when the total size of CDC files in the {@link #getCdcWalPath directory} exceeds the limit. + * The CDC application will log an error due to a gap in wal files sequence. Note that cache changes will be lost. + * Default is no limit. + * + * @param cdcWalDirMaxSize CDC directory maximum size in bytes. + * @return {@code this} for chaining. + */ + @IgniteExperimental + public DataStorageConfiguration setCdcWalDirectoryMaxSize(long cdcWalDirMaxSize) { + this.cdcWalDirMaxSize = cdcWalDirMaxSize; + + return this; + } + /** * Gets flag indicating whether persistence metrics collection is enabled. * Default value is {@link #DFLT_METRICS_ENABLED}. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java index e16e1b409f3de..b7d449fa6a89a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java @@ -442,7 +442,10 @@ public void consumeWalSegmentsUntilStopped() { .peek(p -> { long nextSgmnt = segmentIndex(p); - assert lastSgmnt.get() == -1 || nextSgmnt - lastSgmnt.get() == 1; + if (lastSgmnt.get() != -1 && nextSgmnt - lastSgmnt.get() != 1) { + throw new IgniteException("Found missed segments. Some events are missed. " + + "[lastSegment=" + lastSgmnt.get() + ", nextSegment=" + nextSgmnt + ']'); + } lastSgmnt.set(nextSgmnt); }) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 0dc36ad6174c6..619a84ef0fe43 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -2140,8 +2140,14 @@ public SegmentArchiveResult archiveSegment(long absIdx) throws StorageException Files.move(dstTmpFile.toPath(), dstFile.toPath()); if (walCdcDir != null) { - if (!cdcDisabled.getOrDefault(false)) - Files.createLink(walCdcDir.toPath().resolve(dstFile.getName()), dstFile.toPath()); + if (!cdcDisabled.getOrDefault(false)) { + if (checkCdcWalDirectorySize(dstFile.length())) + Files.createLink(walCdcDir.toPath().resolve(dstFile.getName()), dstFile.toPath()); + else { + log.error("Creation of segment CDC link skipped. Configured CDC directory " + + "maximum size exceeded."); + } + } else { log.warning("Creation of segment CDC link skipped. " + "'" + CDC_DISABLED + "' distributed property is 'true'."); @@ -2212,6 +2218,27 @@ public void restart() { new IgniteThread(archiver).start(); } + + /** + * @param len Length of file to check size. + * @return {@code True} if the CDC directory size check successful, otherwise {@code false}. + */ + private boolean checkCdcWalDirectorySize(long len) { + long maxDirSize = igCfg.getDataStorageConfiguration().getCdcWalDirectoryMaxSize(); + + if (maxDirSize <= 0) + return true; + + long dirSize = Arrays.stream(walCdcDir.listFiles(WAL_SEGMENT_FILE_FILTER)).mapToLong(File::length).sum(); + + if (dirSize + len <= maxDirSize) + return true; + + log.warning("Configured CDC WAL directory maximum size exceeded [curDirSize=" + dirSize + + ", fileLength=" + len + ", cdcWalDirectoryMaxSize=" + maxDirSize + ']'); + + return false; + } } /** diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java index 369a7663aa144..6e748c2eb9363 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -46,8 +47,11 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cdc.CdcMain; +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.record.DataRecord; +import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; @@ -55,6 +59,7 @@ import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty; import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.lang.RunnableX; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.junits.WithSystemProperty; @@ -68,8 +73,11 @@ import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.DELETE; import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.UPDATE; import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_CDC_WAL_DIRECTORY_MAX_SIZE; +import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_PAGE_SIZE; import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId; +import static org.apache.ignite.testframework.GridTestUtils.assertThrows; import static org.apache.ignite.testframework.GridTestUtils.runAsync; import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; import static org.junit.Assume.assumeTrue; @@ -95,6 +103,9 @@ public class CdcSelfTest extends AbstractCdcTest { @Parameterized.Parameter(2) public boolean persistenceEnabled; + /** */ + private long cdcWalDirMaxSize = DFLT_CDC_WAL_DIRECTORY_MAX_SIZE; + /** */ @Parameterized.Parameters(name = "consistentId={0}, wal={1}, persistence={2}") public static Collection parameters() { @@ -121,7 +132,8 @@ public static Collection parameters() { .setDefaultDataRegionConfiguration(new DataRegionConfiguration() .setPersistenceEnabled(persistenceEnabled) .setCdcEnabled(true)) - .setWalArchivePath(DFLT_WAL_ARCHIVE_PATH + "/" + U.maskForFileName(igniteInstanceName))); + .setWalArchivePath(DFLT_WAL_ARCHIVE_PATH + "/" + U.maskForFileName(igniteInstanceName)) + .setCdcWalDirectoryMaxSize(cdcWalDirMaxSize)); cfg.setCacheConfiguration( new CacheConfiguration<>(TX_CACHE_NAME) @@ -726,6 +738,58 @@ public void testDisable() throws Exception { assertTrue(waitForCondition(() -> 2 == walCdcDir.list().length, 2 * WAL_ARCHIVE_TIMEOUT)); } + /** */ + @Test + public void testCdcDirectoryMaxSize() throws Exception { + cdcWalDirMaxSize = 10 * U.MB; + int segmentSize = (int)(cdcWalDirMaxSize / 2); + + IgniteEx ign = startGrid(0); + + ign.cluster().state(ACTIVE); + + IgniteCache cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME); + IgniteWriteAheadLogManager wal = ign.context().cache().context().wal(true); + File walCdcDir = U.field(ign.context().cache().context().wal(true), "walCdcDir"); + + RunnableX writeSgmnt = () -> { + int sgmnts = wal.walArchiveSegments(); + int dataSize = (int)(segmentSize * 0.8); + + for (int i = 0; i < dataSize / DFLT_PAGE_SIZE; i++) + wal.log(new PageSnapshot(new FullPageId(-1, -1), new byte[DFLT_PAGE_SIZE], 1)); + + addData(cache, 0, 1); + + waitForCondition(() -> wal.walArchiveSegments() > sgmnts, 2 * WAL_ARCHIVE_TIMEOUT); + }; + + // Write to the WAL to exceed the configured max size. + writeSgmnt.run(); + writeSgmnt.run(); + + // The segment link creation should be skipped. + writeSgmnt.run(); + + assertTrue(cdcWalDirMaxSize >= Arrays.stream(walCdcDir.listFiles()).mapToLong(File::length).sum()); + + UserCdcConsumer cnsmr = new UserCdcConsumer(); + + CdcMain cdc = createCdc(cnsmr, getConfiguration(ign.name())); + + IgniteInternalFuture fut = runAsync(cdc); + + waitForSize(2, DEFAULT_CACHE_NAME, UPDATE, cnsmr); + + assertFalse(fut.isDone()); + + // Write next segment after skipped. + writeSgmnt.run(); + + assertThrows(log, () -> fut.get(getTestTimeout()), IgniteCheckedException.class, + "Found missed segments. Some events are missed."); + } + /** */ public static void addData(IgniteCache cache, int from, int to) { for (int i = from; i < to; i++) From 013c20f9a259f015be9a0069aa3df5107e67f608 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Tue, 31 Jan 2023 17:19:25 +0300 Subject: [PATCH 07/13] review fixes --- docs/_docs/persistence/change-data-capture.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/_docs/persistence/change-data-capture.adoc b/docs/_docs/persistence/change-data-capture.adoc index 763128131b750..d713e7e8863bf 100644 --- a/docs/_docs/persistence/change-data-capture.adoc +++ b/docs/_docs/persistence/change-data-capture.adoc @@ -146,7 +146,7 @@ IMPORTANT: `ignite-cdc.sh` implements the fail-fast approach. It just fails in c The CDC can be disabled manually or by configured directory maximum size. In this case a hard link creation will be skipped. -NOTE: Note that cache changes in skipped segments will be lost. +WARNING: All changes in skipped segments will be lost! The CDC application fails if found missing segments. From 92427db448f699fb70ab1e79faee91271e655b8d Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Tue, 31 Jan 2023 20:41:55 +0300 Subject: [PATCH 08/13] review fixes --- .../persistence/change-data-capture.adoc | 5 +- .../commandline/CommandArgIterator.java | 15 ++++ .../internal/commandline/cdc/CdcCommand.java | 38 +++++++---- .../commandline/metric/MetricCommand.java | 15 +--- .../apache/ignite/util/CdcCommandTest.java | 68 ++++++++----------- .../cdc/VisorCdcDeleteLostSegmentsTask.java | 19 +++++- ...mmandHandlerClusterByClassTest_help.output | 8 ++- ...ndlerClusterByClassWithSSLTest_help.output | 7 +- .../zk/ZookeeperDiscoverySpiTestSuite4.java | 59 ---------------- 9 files changed, 101 insertions(+), 133 deletions(-) delete mode 100644 modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite4.java diff --git a/docs/_docs/persistence/change-data-capture.adoc b/docs/_docs/persistence/change-data-capture.adoc index d713e7e8863bf..93afdf859a365 100644 --- a/docs/_docs/persistence/change-data-capture.adoc +++ b/docs/_docs/persistence/change-data-capture.adoc @@ -154,7 +154,10 @@ The link:tools/control-script[Control Script] script provides the ability to del [source,shell] ---- -# Delete lost segment links on a node. +# Delete lost segment CDC links in the cluster. +control.sh|bat --cdc delete_lost_segment_links + +# Delete lost segment CDC links on a node. control.sh|bat --cdc delete_lost_segment_links --node-id node_id ---- diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandArgIterator.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandArgIterator.java index 0e1e89b50737a..a89336c86fe1b 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandArgIterator.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandArgIterator.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; +import java.util.UUID; import org.apache.ignite.internal.util.typedef.F; import org.jetbrains.annotations.NotNull; @@ -149,6 +150,20 @@ public int nextIntArg(String argName) { } } + /** @return UUID value. */ + public UUID nextUuidArg(String argName) { + String str = nextArg("Expecting " + argName); + + try { + return UUID.fromString(str); + } + catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Failed to parse " + argName + " command argument." + + " String representation of \"java.util.UUID\" is exepected. For example:" + + " 123e4567-e89b-42d3-a456-556642440000", e); + } + } + /** * @param argName Name of argument. */ diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java index d1814e2703662..67a8f268ca402 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java @@ -17,14 +17,21 @@ package org.apache.ignite.internal.commandline.cdc; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.client.GridClient; import org.apache.ignite.internal.client.GridClientConfiguration; +import org.apache.ignite.internal.client.GridClientNode; import org.apache.ignite.internal.commandline.AbstractCommand; import org.apache.ignite.internal.commandline.Command; import org.apache.ignite.internal.commandline.CommandArgIterator; import org.apache.ignite.internal.commandline.CommandLogger; +import org.apache.ignite.internal.visor.VisorTaskArgument; import org.apache.ignite.internal.visor.cdc.VisorCdcDeleteLostSegmentsTask; import static org.apache.ignite.internal.commandline.CommandList.CDC; @@ -56,6 +63,13 @@ public class CdcCommand extends AbstractCommand { clientCfg ); + Collection nodeIds = nodeId != null ? Collections.singletonList(nodeId) : + client.compute().nodes(node -> !node.isClient()).stream().map(GridClientNode::nodeId) + .collect(Collectors.toSet()); + + client.compute().execute(VisorCdcDeleteLostSegmentsTask.class.getName(), + new VisorTaskArgument<>(nodeIds, false)); + String res = "Lost segment CDC links successfully removed."; log.info(res); @@ -82,20 +96,9 @@ public class CdcCommand extends AbstractCommand { while (argIter.hasNextSubArg()) { String opt = argIter.nextArg("Failed to read command argument."); - if (NODE_ID.equalsIgnoreCase(opt)) { - try { - nodeId = UUID.fromString(argIter.nextArg("Expected node ID argument.")); - } - catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Failed to parse " + NODE_ID + " command argument." + - " String representation of \"java.util.UUID\" is exepected. For example:" + - " 123e4567-e89b-42d3-a456-556642440000", e); - } - } + if (NODE_ID.equalsIgnoreCase(opt)) + nodeId = argIter.nextUuidArg("node ID argument"); } - - if (nodeId == null) - throw new IllegalArgumentException("Expected node ID option: " + NODE_ID); } /** {@inheritDoc} */ @@ -110,8 +113,13 @@ public class CdcCommand extends AbstractCommand { /** {@inheritDoc} */ @Override public void printUsage(IgniteLogger logger) { - usage(logger, "Delete lost segment links on a node:", CDC, DELETE_LOST_SEGMENT_LINKS, NODE_ID, "node_id", - optional(CMD_AUTO_CONFIRMATION)); + Map params = new LinkedHashMap<>(); + + params.put("node_id", "ID of the node to delete lost segment links from. If not set, the command will affect " + + "all server nodes."); + + usage(logger, "Delete lost segment CDC links:", CDC, params, DELETE_LOST_SEGMENT_LINKS, + optional(NODE_ID, "node_id"), optional(CMD_AUTO_CONFIRMATION)); } /** {@inheritDoc} */ diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/metric/MetricCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/metric/MetricCommand.java index 3e8d22bf2ed29..1d4be5c73a00e 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/metric/MetricCommand.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/metric/MetricCommand.java @@ -106,19 +106,8 @@ else if (arg().bounds() == null && arg().rateTimeInterval() < 0) MetricCommandArg cmdArg = CommandArgUtils.of(arg, MetricCommandArg.class); - if (cmdArg == NODE_ID) { - String nodeIdArg = argIter.nextArg( - "ID of the node from which metric values should be obtained is expected."); - - try { - nodeId = UUID.fromString(nodeIdArg); - } - catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Failed to parse " + NODE_ID + " command argument." + - " String representation of \"java.util.UUID\" is exepected. For example:" + - " 123e4567-e89b-42d3-a456-556642440000", e); - } - } + if (cmdArg == NODE_ID) + nodeId = argIter.nextUuidArg("ID of the node from which metric values should be obtained"); else if (cmdArg == CONFIGURE_HISTOGRAM || cmdArg == CONFIGURE_HITRATE) { if (metricName != null) { throw new IllegalArgumentException( diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java index fbaae2e7ed915..322d663587a69 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java @@ -113,17 +113,13 @@ public void testParseDeleteLostSegmentLinks() { CommandList.CDC.text(), "unexpected_command"), "Unexpected command: unexpected_command"); - assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS, - CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS), - "Expected node ID option: " + NODE_ID); - assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS, CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID), - "Failed to parse " + NODE_ID + " command argument."); + "Failed to parse node ID argument command argument."); assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS, CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID, "10"), - "Failed to parse " + NODE_ID + " command argument."); + "Failed to parse node ID argument command argument."); } /** */ @@ -159,52 +155,39 @@ public void testDeleteLostSegmentLinksApplicationNotClosed() throws Exception { /** */ @Test public void testDeleteLostSegmentLinks() throws Exception { - archiveSegment(); - - cdcDisabled.propagate(true); - archiveSegment(); - cdcDisabled.propagate(false); - - archiveSegment(); + checkDeleteLostSegmentLinks(F.asList(0L, 2L), F.asList(2L), true); + } - checkSegmentLinks(F.asList(0L, 2L), F.asList(2L)); + /** */ + @Test + public void testDeleteLostSegmentLinksOneNode() throws Exception { + checkDeleteLostSegmentLinks(F.asList(0L, 2L), F.asList(2L), false); } /** */ @Test public void testDeleteLostSegmentLinksMultipleGaps() throws Exception { - archiveSegment(); - - cdcDisabled.propagate(true); - archiveSegment(); - archiveSegment(); - cdcDisabled.propagate(false); - - archiveSegment(); - - cdcDisabled.propagate(true); - archiveSegment(); - cdcDisabled.propagate(false); - - archiveSegment(); - - checkSegmentLinks(F.asList(0L, 3L, 5L), F.asList(5L)); + checkDeleteLostSegmentLinks(F.asList(0L, 3L, 5L), F.asList(5L), true); } /** */ - private void checkSegmentLinks(List expBefore, List expAfter) { - checkFiles(srv0, expBefore); - checkFiles(srv1, expBefore); + private void checkDeleteLostSegmentLinks(List expBefore, List expAfter, boolean allNodes) throws Exception { + archiveSegmentLinks(expBefore); + + checkLinks(srv0, expBefore); + checkLinks(srv1, expBefore); + + String[] args = allNodes ? new String[] {CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS} : + new String[] {CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID, srv0.localNode().id().toString()}; - executeCommand(EXIT_CODE_OK, - CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID, srv0.localNode().id().toString()); + executeCommand(EXIT_CODE_OK, args); - checkFiles(srv0, expAfter); - checkFiles(srv1, expBefore); // The command executed for srv0 only. + checkLinks(srv0, expAfter); + checkLinks(srv1, allNodes ? expAfter : expBefore); } /** */ - private void checkFiles(IgniteEx srv, List expLinks) { + private void checkLinks(IgniteEx srv, List expLinks) { FileWriteAheadLogManager wal0 = (FileWriteAheadLogManager)srv.context().cache().context().wal(true); File[] links = wal0.walCdcDirectory().listFiles(WAL_SEGMENT_FILE_FILTER); @@ -214,6 +197,15 @@ private void checkFiles(IgniteEx srv, List expLinks) { .allMatch(expLinks::contains); } + /** Archive given segments links with possible gaps. */ + private void archiveSegmentLinks(List idxs) throws Exception { + for (long idx = 0; idx <= idxs.stream().mapToLong(v -> v).max().getAsLong(); idx++) { + cdcDisabled.propagate(!idxs.contains(idx)); + + archiveSegment(); + } + } + /** */ private void archiveSegment() throws Exception { CountDownLatch latch = new CountDownLatch(G.allGrids().size()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java index 1199ca4ebe8db..b63e18d757ae9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java @@ -23,20 +23,23 @@ import java.nio.file.Path; import java.util.Comparator; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.internal.cdc.CdcFileLockHolder; import org.apache.ignite.internal.cdc.CdcMain; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.visor.VisorJob; -import org.apache.ignite.internal.visor.VisorOneNodeTask; +import org.apache.ignite.internal.visor.VisorMultiNodeTask; import org.apache.ignite.resources.LoggerResource; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.cdc.CdcConsumerState.WAL_STATE_FILE_NAME; import static org.apache.ignite.internal.cdc.CdcMain.STATE_DIR; @@ -46,7 +49,7 @@ * Task to delete lost segment CDC links. */ @GridInternal -public class VisorCdcDeleteLostSegmentsTask extends VisorOneNodeTask { +public class VisorCdcDeleteLostSegmentsTask extends VisorMultiNodeTask { /** */ private static final long serialVersionUID = 0L; @@ -55,6 +58,18 @@ public class VisorCdcDeleteLostSegmentsTask extends VisorOneNodeTask return new VisorCdcDeleteLostSegmentsJob(arg, false); } + /** {@inheritDoc} */ + @Override protected @Nullable Void reduce0(List results) throws IgniteException { + for (ComputeJobResult res : results) { + if (res.getException() != null) { + throw new IgniteException("Failed to delete lost segment CDC links on a node " + + "[nodeId=" + res.getNode().id() + ']', res.getException()); + } + } + + return null; + } + /** */ private static class VisorCdcDeleteLostSegmentsJob extends VisorJob { /** */ diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output index 80ae8efae3e27..15ec84a3e8cc3 100644 --- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output +++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output @@ -344,12 +344,14 @@ If the file name isn't specified the output file name is: '.bin' control.(sh|bat) --consistency finalize [EXPERIMENTAL] - Delete lost segment links on a node: - control.(sh|bat) --cdc delete_lost_segment_links --node-id node_id [--yes] + Delete lost segment CDC links: + control.(sh|bat) --cdc delete_lost_segment_links [--node-id node_id] [--yes] + + Parameters: + node_id - ID of the node to delete lost segment links from. If not set, the command will affect all server nodes. By default commands affecting the cluster require interactive confirmation. Use --yes option to disable it. - Default values: HOST_OR_IP=127.0.0.1 PORT=11211 diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output index 80ae8efae3e27..0f27ca2680b41 100644 --- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output +++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output @@ -344,8 +344,11 @@ If the file name isn't specified the output file name is: '.bin' control.(sh|bat) --consistency finalize [EXPERIMENTAL] - Delete lost segment links on a node: - control.(sh|bat) --cdc delete_lost_segment_links --node-id node_id [--yes] + Delete lost segment CDC links: + control.(sh|bat) --cdc delete_lost_segment_links [--node-id node_id] [--yes] + + Parameters: + node_id - ID of the node to delete lost segment links from. If not set, the command will affect all server nodes. By default commands affecting the cluster require interactive confirmation. Use --yes option to disable it. diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite4.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite4.java deleted file mode 100644 index e42adaf25c478..0000000000000 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite4.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.zk; - -import org.apache.ignite.common.CacheCreateDestroyEventSecurityContextTest; -import org.apache.ignite.internal.ClusterNodeMetricsUpdateTest; -import org.apache.ignite.internal.IgniteNodeValidationFailedEventTest; -import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePutRetryAtomicSelfTest; -import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePutRetryTransactionalSelfTest; -import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicMultiNodeFullApiSelfTest; -import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedAtomicMultiNodeFullApiSelfTest; -import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedQuerySelfTest; -import org.apache.ignite.internal.processors.metastorage.DistributedMetaStoragePersistentTest; -import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorageTest; -import org.apache.ignite.spi.discovery.DiscoverySpiDataExchangeTest; -import org.junit.BeforeClass; -import org.junit.runner.RunWith; -import org.junit.runners.Suite; - -/** - * Regular Ignite tests executed with {@link ZookeeperDiscoverySpi}. - */ -@RunWith(Suite.class) -@Suite.SuiteClasses({ - ZookeeperDiscoverySuitePreprocessorTest.class, - IgniteCachePutRetryAtomicSelfTest.class, - IgniteCachePutRetryTransactionalSelfTest.class, - ClusterNodeMetricsUpdateTest.class, - GridCacheAtomicMultiNodeFullApiSelfTest.class, - GridCacheReplicatedAtomicMultiNodeFullApiSelfTest.class, - IgniteCacheReplicatedQuerySelfTest.class, - DistributedMetaStorageTest.class, - DistributedMetaStoragePersistentTest.class, - IgniteNodeValidationFailedEventTest.class, - DiscoverySpiDataExchangeTest.class, - CacheCreateDestroyEventSecurityContextTest.class -}) -public class ZookeeperDiscoverySpiTestSuite4 { - /** */ - @BeforeClass - public static void init() throws Exception { - ZookeeperDiscoverySpiTestConfigurator.initTestSuite(); - } -} From 7501fbe0264517e8bfa60be91684bc7aeabd03a4 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Tue, 31 Jan 2023 20:56:37 +0300 Subject: [PATCH 09/13] review fixes --- .../ignite/internal/commandline/CommandArgIterator.java | 2 +- .../apache/ignite/internal/commandline/cdc/CdcCommand.java | 2 +- .../ignite/internal/commandline/metric/MetricCommand.java | 2 +- .../src/test/java/org/apache/ignite/util/CdcCommandTest.java | 4 ++-- .../test/java/org/apache/ignite/util/MetricCommandTest.java | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandArgIterator.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandArgIterator.java index a89336c86fe1b..892f87e01661b 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandArgIterator.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandArgIterator.java @@ -152,7 +152,7 @@ public int nextIntArg(String argName) { /** @return UUID value. */ public UUID nextUuidArg(String argName) { - String str = nextArg("Expecting " + argName); + String str = nextArg("Expecting " + argName + " command argument."); try { return UUID.fromString(str); diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java index 67a8f268ca402..a61218daedec5 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java @@ -97,7 +97,7 @@ public class CdcCommand extends AbstractCommand { String opt = argIter.nextArg("Failed to read command argument."); if (NODE_ID.equalsIgnoreCase(opt)) - nodeId = argIter.nextUuidArg("node ID argument"); + nodeId = argIter.nextUuidArg(NODE_ID); } } diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/metric/MetricCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/metric/MetricCommand.java index 1d4be5c73a00e..6dd8c600387b7 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/metric/MetricCommand.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/metric/MetricCommand.java @@ -107,7 +107,7 @@ else if (arg().bounds() == null && arg().rateTimeInterval() < 0) MetricCommandArg cmdArg = CommandArgUtils.of(arg, MetricCommandArg.class); if (cmdArg == NODE_ID) - nodeId = argIter.nextUuidArg("ID of the node from which metric values should be obtained"); + nodeId = argIter.nextUuidArg(NODE_ID.argName()); else if (cmdArg == CONFIGURE_HISTOGRAM || cmdArg == CONFIGURE_HITRATE) { if (metricName != null) { throw new IllegalArgumentException( diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java index 322d663587a69..d920e08ab299b 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java @@ -115,11 +115,11 @@ public void testParseDeleteLostSegmentLinks() { assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS, CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID), - "Failed to parse node ID argument command argument."); + "Failed to parse " + NODE_ID + " command argument."); assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS, CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID, "10"), - "Failed to parse node ID argument command argument."); + "Failed to parse " + NODE_ID + " command argument."); } /** */ diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/MetricCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/MetricCommandTest.java index beb7aab002bdc..ae5b01e07eed7 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/MetricCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/MetricCommandTest.java @@ -75,7 +75,7 @@ public void testMetricNameMissedFailure() { @Test public void testNodeIdMissedFailure() { assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS, CMD_METRIC, SYS_METRICS, NODE_ID.argName()), - "ID of the node from which metric values should be obtained is expected."); + "Expecting " + NODE_ID.argName() + " command argument."); } /** Tests command error output in case value of {@link MetricCommandArg#NODE_ID} argument is invalid.*/ From e328ff9f144213b38f1853802a5b128a5a7ba5ff Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Tue, 31 Jan 2023 20:59:28 +0300 Subject: [PATCH 10/13] review fixes --- .../zk/ZookeeperDiscoverySpiTestSuite4.java | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite4.java diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite4.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite4.java new file mode 100644 index 0000000000000..e42adaf25c478 --- /dev/null +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite4.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk; + +import org.apache.ignite.common.CacheCreateDestroyEventSecurityContextTest; +import org.apache.ignite.internal.ClusterNodeMetricsUpdateTest; +import org.apache.ignite.internal.IgniteNodeValidationFailedEventTest; +import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePutRetryAtomicSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePutRetryTransactionalSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicMultiNodeFullApiSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedAtomicMultiNodeFullApiSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedQuerySelfTest; +import org.apache.ignite.internal.processors.metastorage.DistributedMetaStoragePersistentTest; +import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorageTest; +import org.apache.ignite.spi.discovery.DiscoverySpiDataExchangeTest; +import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +/** + * Regular Ignite tests executed with {@link ZookeeperDiscoverySpi}. + */ +@RunWith(Suite.class) +@Suite.SuiteClasses({ + ZookeeperDiscoverySuitePreprocessorTest.class, + IgniteCachePutRetryAtomicSelfTest.class, + IgniteCachePutRetryTransactionalSelfTest.class, + ClusterNodeMetricsUpdateTest.class, + GridCacheAtomicMultiNodeFullApiSelfTest.class, + GridCacheReplicatedAtomicMultiNodeFullApiSelfTest.class, + IgniteCacheReplicatedQuerySelfTest.class, + DistributedMetaStorageTest.class, + DistributedMetaStoragePersistentTest.class, + IgniteNodeValidationFailedEventTest.class, + DiscoverySpiDataExchangeTest.class, + CacheCreateDestroyEventSecurityContextTest.class +}) +public class ZookeeperDiscoverySpiTestSuite4 { + /** */ + @BeforeClass + public static void init() throws Exception { + ZookeeperDiscoverySpiTestConfigurator.initTestSuite(); + } +} From 85de09a9f4ed280f46da517455b0912b44181696 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Tue, 31 Jan 2023 21:00:52 +0300 Subject: [PATCH 11/13] fix test --- .../GridCommandHandlerClusterByClassTest_help.output | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output index 15ec84a3e8cc3..0f27ca2680b41 100644 --- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output +++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output @@ -352,6 +352,7 @@ If the file name isn't specified the output file name is: '.bin' By default commands affecting the cluster require interactive confirmation. Use --yes option to disable it. + Default values: HOST_OR_IP=127.0.0.1 PORT=11211 From de925fd58f8cb3cc46003c89d2c447ee4d02e8ae Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Wed, 1 Feb 2023 10:28:48 +0300 Subject: [PATCH 12/13] review fixes --- docs/_docs/persistence/change-data-capture.adoc | 1 + .../apache/ignite/internal/commandline/cdc/CdcCommand.java | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/_docs/persistence/change-data-capture.adoc b/docs/_docs/persistence/change-data-capture.adoc index 93afdf859a365..e4d5253b14505 100644 --- a/docs/_docs/persistence/change-data-capture.adoc +++ b/docs/_docs/persistence/change-data-capture.adoc @@ -145,6 +145,7 @@ IMPORTANT: `ignite-cdc.sh` implements the fail-fast approach. It just fails in c == Handling skipped segments The CDC can be disabled manually or by configured directory maximum size. In this case a hard link creation will be skipped. +So when enabled there will be gap between segments: `0000000000000002.bin`, `0000000000000010.bin`, `0000000000000011.bin`, for example. WARNING: All changes in skipped segments will be lost! diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java index a61218daedec5..e60f73dc3cdbf 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java @@ -103,7 +103,9 @@ public class CdcCommand extends AbstractCommand { /** {@inheritDoc} */ @Override public String confirmationPrompt() { - return "Warning: the command will delete lost segment CDC links. Cache events from these segments will be lost."; + return "Warning: The command will fix WAL segments gap in case CDC link creation was stopped by distributed " + + "property or excess of maximum CDC directory size. Gap will be fixed by deletion of WAL segments " + + "previous to the gap."; } /** {@inheritDoc} */ From 5b75cfa5fa0bf3ef80cb15d4c638689d785f6173 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Thu, 2 Feb 2023 10:09:13 +0300 Subject: [PATCH 13/13] review fixes --- docs/_docs/persistence/change-data-capture.adoc | 15 ++++++++++++--- .../internal/commandline/cdc/CdcCommand.java | 8 ++++++-- .../org/apache/ignite/internal/cdc/CdcMain.java | 2 +- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/docs/_docs/persistence/change-data-capture.adoc b/docs/_docs/persistence/change-data-capture.adoc index e4d5253b14505..f04fffaee55ab 100644 --- a/docs/_docs/persistence/change-data-capture.adoc +++ b/docs/_docs/persistence/change-data-capture.adoc @@ -145,13 +145,16 @@ IMPORTANT: `ignite-cdc.sh` implements the fail-fast approach. It just fails in c == Handling skipped segments The CDC can be disabled manually or by configured directory maximum size. In this case a hard link creation will be skipped. -So when enabled there will be gap between segments: `0000000000000002.bin`, `0000000000000010.bin`, `0000000000000011.bin`, for example. WARNING: All changes in skipped segments will be lost! -The CDC application fails if found missing segments. +So when enabled there will be gap between segments: `0000000000000002.wal`, `0000000000000010.wal`, `0000000000000011.wal`, for example. +In this case `ignite-cdc.sh` will fail with the something like "Found missed segments. Some events are missed. Exiting! [lastSegment=2, nextSegment=10]". -The link:tools/control-script[Control Script] script provides the ability to delete lost segments before a last gap in segments: +NOTE: Make sure you need to sync data before restarting the CDC application. You can synchronize caches using +snapshot or other methods. + +To fix this error you can run the following link:tools/control-script[Control Script] command: [source,shell] ---- @@ -162,6 +165,12 @@ control.sh|bat --cdc delete_lost_segment_links control.sh|bat --cdc delete_lost_segment_links --node-id node_id ---- +The command will remove all segment links before the last gap. + +For example, CDC was turned off several times: `000000000000002.wal`, `000000000000003.wal`, `000000000000008.wal`, `0000000000000010.wal`, `0000000000000011.wal` +Then, after the command is executed, the following segment links will be deleted: `000000000000002.wal`, `000000000000003.wal`, `000000000000008.wal`. +The application will start from the `0000000000000010.wal` segment after being enabled. + == cdc-ext Ignite extensions project has link:https://github.com/apache/ignite-extensions/tree/master/modules/cdc-ext[cdc-ext] module which provides two way to setup cross cluster replication based on CDC. diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java index e60f73dc3cdbf..8dbd2b963abd6 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.commandline.Command; import org.apache.ignite.internal.commandline.CommandArgIterator; import org.apache.ignite.internal.commandline.CommandLogger; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.visor.VisorTaskArgument; import org.apache.ignite.internal.visor.cdc.VisorCdcDeleteLostSegmentsTask; @@ -104,8 +105,11 @@ public class CdcCommand extends AbstractCommand { /** {@inheritDoc} */ @Override public String confirmationPrompt() { return "Warning: The command will fix WAL segments gap in case CDC link creation was stopped by distributed " + - "property or excess of maximum CDC directory size. Gap will be fixed by deletion of WAL segments " + - "previous to the gap."; + "property or excess of maximum CDC directory size. Gap will be fixed by deletion of WAL segment links" + + "previous to the last gap." + U.nl() + + "All changes in deleted segment links will be lost!" + U.nl() + + "Make sure you need to sync data before restarting the CDC application. You can synchronize caches " + + "using snapshot or other methods."; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java index 18e768932b0da..06802d36534b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java @@ -443,7 +443,7 @@ public void consumeWalSegmentsUntilStopped() { long nextSgmnt = segmentIndex(p); if (lastSgmnt.get() != -1 && nextSgmnt - lastSgmnt.get() != 1) { - throw new IgniteException("Found missed segments. Some events are missed. " + + throw new IgniteException("Found missed segments. Some events are missed. Exiting! " + "[lastSegment=" + lastSgmnt.get() + ", nextSegment=" + nextSgmnt + ']'); }