diff --git a/docs/_docs/persistence/change-data-capture.adoc b/docs/_docs/persistence/change-data-capture.adoc index adbfe1e47063a..f04fffaee55ab 100644 --- a/docs/_docs/persistence/change-data-capture.adoc +++ b/docs/_docs/persistence/change-data-capture.adoc @@ -142,7 +142,36 @@ IMPORTANT: `ignite-cdc.sh` implements the fail-fast approach. It just fails in c 5. Infinitely wait for the newly available segment and process it. 6. Stop the consumer in case of a failure or a received stop signal. +== Handling skipped segments + +The CDC can be disabled manually or by configured directory maximum size. In this case a hard link creation will be skipped. + +WARNING: All changes in skipped segments will be lost! + +So when enabled there will be gap between segments: `0000000000000002.wal`, `0000000000000010.wal`, `0000000000000011.wal`, for example. +In this case `ignite-cdc.sh` will fail with the something like "Found missed segments. Some events are missed. Exiting! [lastSegment=2, nextSegment=10]". + +NOTE: Make sure you need to sync data before restarting the CDC application. You can synchronize caches using +snapshot or other methods. + +To fix this error you can run the following link:tools/control-script[Control Script] command: + +[source,shell] +---- +# Delete lost segment CDC links in the cluster. +control.sh|bat --cdc delete_lost_segment_links + +# Delete lost segment CDC links on a node. +control.sh|bat --cdc delete_lost_segment_links --node-id node_id +---- + +The command will remove all segment links before the last gap. + +For example, CDC was turned off several times: `000000000000002.wal`, `000000000000003.wal`, `000000000000008.wal`, `0000000000000010.wal`, `0000000000000011.wal` +Then, after the command is executed, the following segment links will be deleted: `000000000000002.wal`, `000000000000003.wal`, `000000000000008.wal`. +The application will start from the `0000000000000010.wal` segment after being enabled. + == cdc-ext Ignite extensions project has link:https://github.com/apache/ignite-extensions/tree/master/modules/cdc-ext[cdc-ext] module which provides two way to setup cross cluster replication based on CDC. -Detailed documentation can be found on link:extensions-and-integrations/change-data-capture-extensions[page]. \ No newline at end of file +Detailed documentation can be found on link:extensions-and-integrations/change-data-capture-extensions[page]. diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java index fb51d5e9586df..442ca86f000cd 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java @@ -204,7 +204,7 @@ public static SqlNodeList parse(String qry, SqlParser.Config parserCfg) { return parse(new SourceStringReader(qry), parserCfg); } catch (SqlParseException e) { - throw new IgniteSQLException("Failed to parse query.", IgniteQueryErrorCode.PARSING, e); + throw new IgniteSQLException("Failed to parse query. " + e.getMessage(), IgniteQueryErrorCode.PARSING, e); } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java index e8932f5dd27d5..57d09f5fe1eb9 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java @@ -281,6 +281,21 @@ public void createTableIfNotExists() { sql("create table if not exists my_table (id int, val varchar)"); } + /** + * Create table using reserved word + */ + @Test + public void createTableUseReservedWord() { + assertThrows("create table table (id int primary key, val varchar)", IgniteSQLException.class, + "Failed to parse query. Encountered \"table table\""); + + sql("create table \"table\" (id int primary key, val varchar)"); + + sql("insert into \"table\" (id, val) values (0, '1')"); + + assertQuery("select * from \"table\" ").returns(0, "1").check(); + } + /** * Creates a table without a primary key and then insert a few rows. */ diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandArgIterator.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandArgIterator.java index 0e1e89b50737a..892f87e01661b 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandArgIterator.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandArgIterator.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; +import java.util.UUID; import org.apache.ignite.internal.util.typedef.F; import org.jetbrains.annotations.NotNull; @@ -149,6 +150,20 @@ public int nextIntArg(String argName) { } } + /** @return UUID value. */ + public UUID nextUuidArg(String argName) { + String str = nextArg("Expecting " + argName + " command argument."); + + try { + return UUID.fromString(str); + } + catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Failed to parse " + argName + " command argument." + + " String representation of \"java.util.UUID\" is exepected. For example:" + + " 123e4567-e89b-42d3-a456-556642440000", e); + } + } + /** * @param argName Name of argument. */ diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java index df93519469f12..2c7b78ce9bd7a 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.commandline; import org.apache.ignite.internal.commandline.cache.CacheCommands; +import org.apache.ignite.internal.commandline.cdc.CdcCommand; import org.apache.ignite.internal.commandline.consistency.ConsistencyCommand; import org.apache.ignite.internal.commandline.diagnostic.DiagnosticCommand; import org.apache.ignite.internal.commandline.encryption.EncryptionCommands; @@ -103,7 +104,10 @@ public enum CommandList { PERFORMANCE_STATISTICS("--performance-statistics", new PerformanceStatisticsCommand()), /** Command to check/repair consistency. */ - CONSISTENCY("--consistency", new ConsistencyCommand()); + CONSISTENCY("--consistency", new ConsistencyCommand()), + + /** Cdc commands. */ + CDC("--cdc", new CdcCommand()); /** Private values copy so there's no need in cloning it every time. */ private static final CommandList[] VALUES = CommandList.values(); diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommonArgParser.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommonArgParser.java index 2acdc404052ba..238a3a7804620 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommonArgParser.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommonArgParser.java @@ -57,7 +57,7 @@ public class CommonArgParser { static final String CMD_USER = "--user"; /** Option is used for auto confirmation. */ - static final String CMD_AUTO_CONFIRMATION = "--yes"; + public static final String CMD_AUTO_CONFIRMATION = "--yes"; /** */ static final String CMD_PING_INTERVAL = "--ping-interval"; diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java new file mode 100644 index 0000000000000..8dbd2b963abd6 --- /dev/null +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cdc/CdcCommand.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.commandline.cdc; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.client.GridClient; +import org.apache.ignite.internal.client.GridClientConfiguration; +import org.apache.ignite.internal.client.GridClientNode; +import org.apache.ignite.internal.commandline.AbstractCommand; +import org.apache.ignite.internal.commandline.Command; +import org.apache.ignite.internal.commandline.CommandArgIterator; +import org.apache.ignite.internal.commandline.CommandLogger; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.visor.VisorTaskArgument; +import org.apache.ignite.internal.visor.cdc.VisorCdcDeleteLostSegmentsTask; + +import static org.apache.ignite.internal.commandline.CommandList.CDC; +import static org.apache.ignite.internal.commandline.CommandLogger.optional; +import static org.apache.ignite.internal.commandline.CommonArgParser.CMD_AUTO_CONFIRMATION; +import static org.apache.ignite.internal.commandline.TaskExecutor.executeTaskByNameOnNode; + +/** + * CDC command. + */ +public class CdcCommand extends AbstractCommand { + /** Command to delete lost segment links. */ + public static final String DELETE_LOST_SEGMENT_LINKS = "delete_lost_segment_links"; + + /** */ + public static final String NODE_ID = "--node-id"; + + /** Node ID. */ + private UUID nodeId; + + /** {@inheritDoc} */ + @Override public Object execute(GridClientConfiguration clientCfg, IgniteLogger log) throws Exception { + try (GridClient client = Command.startClient(clientCfg)) { + executeTaskByNameOnNode( + client, + VisorCdcDeleteLostSegmentsTask.class.getName(), + null, + nodeId, + clientCfg + ); + + Collection nodeIds = nodeId != null ? Collections.singletonList(nodeId) : + client.compute().nodes(node -> !node.isClient()).stream().map(GridClientNode::nodeId) + .collect(Collectors.toSet()); + + client.compute().execute(VisorCdcDeleteLostSegmentsTask.class.getName(), + new VisorTaskArgument<>(nodeIds, false)); + + String res = "Lost segment CDC links successfully removed."; + + log.info(res); + + return res; + } + catch (Throwable e) { + log.error("Failed to perform operation."); + log.error(CommandLogger.errorMessage(e)); + + throw e; + } + } + + /** {@inheritDoc} */ + @Override public void parseArguments(CommandArgIterator argIter) { + nodeId = null; + + String cmd = argIter.nextArg("Expected command: " + DELETE_LOST_SEGMENT_LINKS); + + if (!DELETE_LOST_SEGMENT_LINKS.equalsIgnoreCase(cmd)) + throw new IllegalArgumentException("Unexpected command: " + cmd); + + while (argIter.hasNextSubArg()) { + String opt = argIter.nextArg("Failed to read command argument."); + + if (NODE_ID.equalsIgnoreCase(opt)) + nodeId = argIter.nextUuidArg(NODE_ID); + } + } + + /** {@inheritDoc} */ + @Override public String confirmationPrompt() { + return "Warning: The command will fix WAL segments gap in case CDC link creation was stopped by distributed " + + "property or excess of maximum CDC directory size. Gap will be fixed by deletion of WAL segment links" + + "previous to the last gap." + U.nl() + + "All changes in deleted segment links will be lost!" + U.nl() + + "Make sure you need to sync data before restarting the CDC application. You can synchronize caches " + + "using snapshot or other methods."; + } + + /** {@inheritDoc} */ + @Override public String arg() { + return null; + } + + /** {@inheritDoc} */ + @Override public void printUsage(IgniteLogger logger) { + Map params = new LinkedHashMap<>(); + + params.put("node_id", "ID of the node to delete lost segment links from. If not set, the command will affect " + + "all server nodes."); + + usage(logger, "Delete lost segment CDC links:", CDC, params, DELETE_LOST_SEGMENT_LINKS, + optional(NODE_ID, "node_id"), optional(CMD_AUTO_CONFIRMATION)); + } + + /** {@inheritDoc} */ + @Override public String name() { + return "cdc"; + } + + /** {@inheritDoc} */ + @Override public boolean experimental() { + return true; + } +} diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/metric/MetricCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/metric/MetricCommand.java index 3e8d22bf2ed29..6dd8c600387b7 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/metric/MetricCommand.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/metric/MetricCommand.java @@ -106,19 +106,8 @@ else if (arg().bounds() == null && arg().rateTimeInterval() < 0) MetricCommandArg cmdArg = CommandArgUtils.of(arg, MetricCommandArg.class); - if (cmdArg == NODE_ID) { - String nodeIdArg = argIter.nextArg( - "ID of the node from which metric values should be obtained is expected."); - - try { - nodeId = UUID.fromString(nodeIdArg); - } - catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Failed to parse " + NODE_ID + " command argument." + - " String representation of \"java.util.UUID\" is exepected. For example:" + - " 123e4567-e89b-42d3-a456-556642440000", e); - } - } + if (cmdArg == NODE_ID) + nodeId = argIter.nextUuidArg(NODE_ID.argName()); else if (cmdArg == CONFIGURE_HISTOGRAM || cmdArg == CONFIGURE_HITRATE) { if (metricName != null) { throw new IllegalArgumentException( diff --git a/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java b/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java index fa478da5fd249..c2ee56ce044b7 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java @@ -64,6 +64,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_ENABLE_EXPERIMENTAL_COMMAND; import static org.apache.ignite.internal.QueryMXBeanImpl.EXPECTED_GLOBAL_QRY_ID_FORMAT; import static org.apache.ignite.internal.commandline.CommandList.CACHE; +import static org.apache.ignite.internal.commandline.CommandList.CDC; import static org.apache.ignite.internal.commandline.CommandList.CLUSTER_CHANGE_TAG; import static org.apache.ignite.internal.commandline.CommandList.SET_STATE; import static org.apache.ignite.internal.commandline.CommandList.SHUTDOWN_POLICY; @@ -78,6 +79,8 @@ import static org.apache.ignite.internal.commandline.cache.CacheSubcommands.VALIDATE_INDEXES; import static org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_FIRST; import static org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_THROUGH; +import static org.apache.ignite.internal.commandline.cdc.CdcCommand.DELETE_LOST_SEGMENT_LINKS; +import static org.apache.ignite.internal.commandline.cdc.CdcCommand.NODE_ID; import static org.apache.ignite.testframework.GridTestUtils.assertThrows; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -412,6 +415,8 @@ else if (cmdL == CLUSTER_CHANGE_TAG) args = parseArgs(asList(cmdL.text(), "newTagValue")); else if (cmdL == WARM_UP) args = parseArgs(asList(cmdL.text(), "--stop")); + else if (cmdL == CDC) + args = parseArgs(asList(cmdL.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID, UUID.randomUUID().toString())); else args = parseArgs(asList(cmdL.text())); @@ -497,6 +502,15 @@ else if (cmdL == WARM_UP) break; } + case CDC: { + args = parseArgs(asList(cmdL.text(), DELETE_LOST_SEGMENT_LINKS, + NODE_ID, UUID.randomUUID().toString(), "--yes")); + + checkCommonParametersCorrectlyParsed(cmdL, args, true); + + break; + } + default: fail("Unknown command: " + cmd); } @@ -1232,6 +1246,7 @@ private boolean requireArgs(@Nullable CommandList cmd) { cmd == CommandList.METRIC || cmd == CommandList.DEFRAGMENTATION || cmd == CommandList.PERFORMANCE_STATISTICS || - cmd == CommandList.CONSISTENCY; + cmd == CommandList.CONSISTENCY || + cmd == CDC; } } diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java index 3b7ac34cb5e47..ce8571c4ef42e 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java @@ -19,6 +19,7 @@ import org.apache.ignite.internal.commandline.indexreader.IgniteIndexReaderTest; import org.apache.ignite.util.CacheMetricsCommandTest; +import org.apache.ignite.util.CdcCommandTest; import org.apache.ignite.util.GridCommandHandlerConsistencyBinaryTest; import org.apache.ignite.util.GridCommandHandlerConsistencyCountersTest; import org.apache.ignite.util.GridCommandHandlerConsistencyRepairCorrectnessAtomicTest; @@ -64,7 +65,9 @@ PerformanceStatisticsCommandTest.class, CacheMetricsCommandTest.class, - IgniteIndexReaderTest.class + IgniteIndexReaderTest.class, + + CdcCommandTest.class }) public class IgniteControlUtilityTestSuite2 { } diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java new file mode 100644 index 0000000000000..d920e08ab299b --- /dev/null +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.util; + +import java.io.File; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.Ignite; +import org.apache.ignite.cdc.AbstractCdcTest; +import org.apache.ignite.cdc.CdcConfiguration; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cdc.CdcMain; +import org.apache.ignite.internal.commandline.CommandList; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty; +import org.apache.ignite.internal.processors.metric.MetricRegistry; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Test; + +import static org.apache.ignite.cdc.AbstractCdcTest.KEYS_CNT; +import static org.apache.ignite.cdc.CdcSelfTest.addData; +import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED; +import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_INVALID_ARGUMENTS; +import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK; +import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR; +import static org.apache.ignite.internal.commandline.cdc.CdcCommand.DELETE_LOST_SEGMENT_LINKS; +import static org.apache.ignite.internal.commandline.cdc.CdcCommand.NODE_ID; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER; +import static org.apache.ignite.testframework.GridTestUtils.assertContains; + +/** + * CDC command tests. + */ +public class CdcCommandTest extends GridCommandHandlerAbstractTest { + /** */ + private IgniteEx srv0; + + /** */ + private IgniteEx srv1; + + /** */ + private DistributedChangeableProperty cdcDisabled; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setBackups(1)); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setWalForceArchiveTimeout(1000) + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setCdcEnabled(true))); + + cfg.setIncludeEventTypes(EVT_WAL_SEGMENT_ARCHIVED); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + cleanPersistenceDir(); + + srv0 = startGrid(0); + srv1 = startGrid(1); + + cdcDisabled = srv0.context().distributedConfiguration().property(FileWriteAheadLogManager.CDC_DISABLED); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** */ + @Test + public void testParseDeleteLostSegmentLinks() { + injectTestSystemOut(); + + assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS, + CommandList.CDC.text(), "unexpected_command"), + "Unexpected command: unexpected_command"); + + assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS, + CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID), + "Failed to parse " + NODE_ID + " command argument."); + + assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS, + CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID, "10"), + "Failed to parse " + NODE_ID + " command argument."); + } + + /** */ + @Test + public void testDeleteLostSegmentLinksApplicationNotClosed() throws Exception { + injectTestSystemOut(); + + CountDownLatch appStarted = new CountDownLatch(1); + + CdcConfiguration cfg = new CdcConfiguration(); + + cfg.setConsumer(new AbstractCdcTest.UserCdcConsumer() { + @Override public void start(MetricRegistry mreg) { + appStarted.countDown(); + } + }); + + CdcMain cdc = new CdcMain(getConfiguration(getTestIgniteInstanceName(0)), null, cfg); + + IgniteInternalFuture fut = GridTestUtils.runAsync(cdc); + + appStarted.await(getTestTimeout(), TimeUnit.MILLISECONDS); + + assertContains(log, executeCommand(EXIT_CODE_UNEXPECTED_ERROR, + CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID, srv0.localNode().id().toString()), + "Failed to delete lost segment CDC links. Unable to acquire lock to lock CDC folder."); + + assertFalse(fut.isDone()); + + fut.cancel(); + } + + /** */ + @Test + public void testDeleteLostSegmentLinks() throws Exception { + checkDeleteLostSegmentLinks(F.asList(0L, 2L), F.asList(2L), true); + } + + /** */ + @Test + public void testDeleteLostSegmentLinksOneNode() throws Exception { + checkDeleteLostSegmentLinks(F.asList(0L, 2L), F.asList(2L), false); + } + + /** */ + @Test + public void testDeleteLostSegmentLinksMultipleGaps() throws Exception { + checkDeleteLostSegmentLinks(F.asList(0L, 3L, 5L), F.asList(5L), true); + } + + /** */ + private void checkDeleteLostSegmentLinks(List expBefore, List expAfter, boolean allNodes) throws Exception { + archiveSegmentLinks(expBefore); + + checkLinks(srv0, expBefore); + checkLinks(srv1, expBefore); + + String[] args = allNodes ? new String[] {CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS} : + new String[] {CommandList.CDC.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID, srv0.localNode().id().toString()}; + + executeCommand(EXIT_CODE_OK, args); + + checkLinks(srv0, expAfter); + checkLinks(srv1, allNodes ? expAfter : expBefore); + } + + /** */ + private void checkLinks(IgniteEx srv, List expLinks) { + FileWriteAheadLogManager wal0 = (FileWriteAheadLogManager)srv.context().cache().context().wal(true); + + File[] links = wal0.walCdcDirectory().listFiles(WAL_SEGMENT_FILE_FILTER); + + assertEquals(expLinks.size(), links.length); + Arrays.stream(links).map(File::toPath).map(CdcMain::segmentIndex) + .allMatch(expLinks::contains); + } + + /** Archive given segments links with possible gaps. */ + private void archiveSegmentLinks(List idxs) throws Exception { + for (long idx = 0; idx <= idxs.stream().mapToLong(v -> v).max().getAsLong(); idx++) { + cdcDisabled.propagate(!idxs.contains(idx)); + + archiveSegment(); + } + } + + /** */ + private void archiveSegment() throws Exception { + CountDownLatch latch = new CountDownLatch(G.allGrids().size()); + + for (Ignite srv : G.allGrids()) { + srv.events().localListen(evt -> { + latch.countDown(); + + return false; + }, EVT_WAL_SEGMENT_ARCHIVED); + } + + addData(srv1.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT); + + latch.await(getTestTimeout(), TimeUnit.MILLISECONDS); + } +} diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java index 3dc6ed6e2145c..e3555a2e31074 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java @@ -314,6 +314,21 @@ protected int execute(String... args) { return execute(new ArrayList<>(asList(args))); } + /** + * Executes command and checks its exit code. + * + * @param expExitCode Expected exit code. + * @param args Command lines arguments. + * @return Result of command execution. + */ + protected String executeCommand(int expExitCode, String... args) { + int res = execute(args); + + assertEquals(expExitCode, res); + + return testOut.toString(); + } + /** * Before command executed {@link #testOut} reset. * diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java index d40f05bfcbb13..8bf760eed5137 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClusterByClassTest.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.EnumMap; import java.util.HashSet; import java.util.List; @@ -92,6 +91,7 @@ import static java.util.Arrays.asList; import static java.util.Arrays.stream; +import static java.util.Collections.singletonList; import static java.util.Objects.nonNull; import static java.util.stream.Collectors.toList; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ENABLE_EXPERIMENTAL_COMMAND; @@ -104,6 +104,7 @@ import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR; import static org.apache.ignite.internal.commandline.CommandHandler.UTILITY_NAME; import static org.apache.ignite.internal.commandline.CommandList.BASELINE; +import static org.apache.ignite.internal.commandline.CommandList.CDC; import static org.apache.ignite.internal.commandline.CommandList.CONSISTENCY; import static org.apache.ignite.internal.commandline.CommandList.METADATA; import static org.apache.ignite.internal.commandline.CommandList.TRACING_CONFIGURATION; @@ -115,6 +116,8 @@ import static org.apache.ignite.internal.commandline.cache.CacheDestroy.DESTROY_ALL_ARG; import static org.apache.ignite.internal.commandline.cache.CacheSubcommands.DESTROY; import static org.apache.ignite.internal.commandline.cache.CacheSubcommands.HELP; +import static org.apache.ignite.internal.commandline.cdc.CdcCommand.DELETE_LOST_SEGMENT_LINKS; +import static org.apache.ignite.internal.commandline.cdc.CdcCommand.NODE_ID; import static org.apache.ignite.internal.commandline.consistency.ConsistencyCommand.CACHE; import static org.apache.ignite.internal.commandline.consistency.ConsistencyCommand.PARTITIONS; import static org.apache.ignite.internal.commandline.consistency.ConsistencyCommand.STRATEGY; @@ -1729,11 +1732,12 @@ public void testContainsWarnInsteadExecExperimentalCmdWhenEnableExperimentalFals cmdArgs.put(WAL, asList(new String[] {"print"}, new String[] {"delete"})); cmdArgs.put(METADATA, asList(new String[] {"help"}, new String[] {"list"})); - cmdArgs.put(TRACING_CONFIGURATION, Collections.singletonList(new String[] {"get_all"})); + cmdArgs.put(TRACING_CONFIGURATION, singletonList(new String[] {"get_all"})); cmdArgs.put(CONSISTENCY, asList( new String[] {"repair", CACHE, "cache", PARTITIONS, "0", STRATEGY, "LWW"}, new String[] {"status"}, new String[] {"finalize"})); + cmdArgs.put(CDC, singletonList(new String[] {DELETE_LOST_SEGMENT_LINKS, NODE_ID, UUID.randomUUID().toString()})); String warning = String.format( "To use experimental command add --enable-experimental parameter for %s", diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/MetricCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/MetricCommandTest.java index b173fad8ddbb4..ae5b01e07eed7 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/MetricCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/MetricCommandTest.java @@ -75,7 +75,7 @@ public void testMetricNameMissedFailure() { @Test public void testNodeIdMissedFailure() { assertContains(log, executeCommand(EXIT_CODE_INVALID_ARGUMENTS, CMD_METRIC, SYS_METRICS, NODE_ID.argName()), - "ID of the node from which metric values should be obtained is expected."); + "Expecting " + NODE_ID.argName() + " command argument."); } /** Tests command error output in case value of {@link MetricCommandArg#NODE_ID} argument is invalid.*/ @@ -465,19 +465,4 @@ private Map parseMetricCommandOutput(String out) { return res; } - - /** - * Executes command and checks its exit code. - * - * @param expExitCode Expected exit code. - * @param args Command lines arguments. - * @return Result of command execution. - */ - private String executeCommand(int expExitCode, String... args) { - int res = execute(args); - - assertEquals(expExitCode, res); - - return testOut.toString(); - } } diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/SystemViewCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/SystemViewCommandTest.java index 67ce487de1e8b..bf788fc6b277a 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/SystemViewCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/SystemViewCommandTest.java @@ -1303,19 +1303,4 @@ private Map>> parseSystemViewCommandOutput(String out) { return res; } - - /** - * Executes command and checks its exit code. - * - * @param expExitCode Expected exit code. - * @param args Command lines arguments. - * @return Result of command execution. - */ - private String executeCommand(int expExitCode, String... args) { - int res = execute(args); - - assertEquals(expExitCode, res); - - return testOut.toString(); - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java index b7d449fa6a89a..06802d36534b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java @@ -438,12 +438,12 @@ public void consumeWalSegmentsUntilStopped() { // Need unseen WAL segments only. .filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile()) && !seen.contains(p)) .peek(seen::add) // Adds to seen. - .sorted(Comparator.comparingLong(this::segmentIndex)) // Sort by segment index. + .sorted(Comparator.comparingLong(CdcMain::segmentIndex)) // Sort by segment index. .peek(p -> { long nextSgmnt = segmentIndex(p); if (lastSgmnt.get() != -1 && nextSgmnt - lastSgmnt.get() != 1) { - throw new IgniteException("Found missed segments. Some events are missed. " + + throw new IgniteException("Found missed segments. Some events are missed. Exiting! " + "[lastSegment=" + lastSgmnt.get() + ", nextSegment=" + nextSgmnt + ']'); } @@ -810,7 +810,7 @@ private CdcFileLockHolder tryLock(File dbStoreDirWithSubdirectory) { * @param segment WAL segment file. * @return Segment index. */ - public long segmentIndex(Path segment) { + public static long segmentIndex(Path segment) { String fn = segment.getFileName().toString(); return Long.parseLong(fn.substring(0, fn.indexOf('.'))); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java index d7e0b63d550f1..6207c7fcd750d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -30,7 +31,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiFunction; @@ -117,6 +120,9 @@ final class ReliableChannel implements AutoCloseable { /** Cache addresses returned by {@code ThinClientAddressFinder}. */ private volatile String[] prevHostAddrs; + /** Open channels counter. */ + private final AtomicInteger channelsCnt = new AtomicInteger(); + /** * Constructor. */ @@ -721,8 +727,18 @@ else if (holders == null) dfltChannelIdx = reinitHolders.size() - 1; } - if (dfltChannelIdx == -1) - dfltChannelIdx = 0; + if (dfltChannelIdx == -1) { + // If holder is not specified get the random holder from the range of holders with the same port. + reinitHolders.sort(Comparator.comparingInt(h -> h.getAddress().getPort())); + + int limit = 0; + int port = reinitHolders.get(0).getAddress().getPort(); + + while (limit + 1 < reinitHolders.size() && reinitHolders.get(limit + 1).getAddress().getPort() == port) + limit++; + + dfltChannelIdx = ThreadLocalRandom.current().nextInt(limit + 1); + } curChannelsGuard.writeLock().lock(); @@ -803,7 +819,21 @@ private T applyOnDefaultChannel(Function function, curChannelsGuard.readLock().lock(); try { - hld = channels.get(curChIdx); + if (!partitionAwarenessEnabled || channelsCnt.get() <= 1 || attempt != 0) + hld = channels.get(curChIdx); + else { + // Make first attempt with the random open channel. + int idx = ThreadLocalRandom.current().nextInt(channels.size()); + int idx0 = idx; + + do { + hld = channels.get(idx); + + if (++idx == channels.size()) + idx = 0; + } + while (hld.ch == null && idx != idx0); + } } finally { curChannelsGuard.readLock().unlock(); @@ -1010,6 +1040,8 @@ private ClientChannel getOrCreateChannel(boolean ignoreThrottling) } ch = channel; + + channelsCnt.incrementAndGet(); } } @@ -1024,6 +1056,8 @@ private synchronized void closeChannel() { U.closeQuiet(ch); ch = null; + + channelsCnt.decrementAndGet(); } } @@ -1055,7 +1089,7 @@ InetSocketAddress getAddress() { } /** - * Get holders reference. For test purposes.ClientOperation + * Get holders reference. For test purposes. */ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") // For tests. List getChannelHolders() { @@ -1070,6 +1104,13 @@ Map getNodeChannels() { return nodeChannels; } + /** + * Get index of current (default) channel holder. For test purposes. + */ + int getCurrentChannelIndex() { + return curChIdx; + } + /** * Get scheduledChannelsReinit reference. For test purposes. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java index fee4d55edb68d..62dd9f1d83e62 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Function; import javax.cache.Cache; @@ -42,6 +43,7 @@ import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.client.ClientCache; import org.apache.ignite.client.ClientCacheConfiguration; +import org.apache.ignite.client.ClientConnectionException; import org.apache.ignite.client.ClientDisconnectListener; import org.apache.ignite.client.ClientException; import org.apache.ignite.client.ClientFeatureNotSupportedByServerException; @@ -228,9 +230,11 @@ public class TcpClientCache implements ClientCache { if (keys.isEmpty()) return true; - return ch.service( + TcpClientTransaction tx = transactions.tx(); + + return txAwareService(null, tx, ClientOperation.CACHE_CONTAINS_KEYS, - req -> writeKeys(keys, req), + req -> writeKeys(keys, req, tx), res -> res.in().readBoolean()); } @@ -242,9 +246,11 @@ public class TcpClientCache implements ClientCache { if (keys.isEmpty()) return IgniteClientFutureImpl.completedFuture(true); - return ch.serviceAsync( + TcpClientTransaction tx = transactions.tx(); + + return txAwareServiceAsync(null, tx, ClientOperation.CACHE_CONTAINS_KEYS, - req -> writeKeys(keys, req), + req -> writeKeys(keys, req, tx), res -> res.in().readBoolean()); } @@ -303,7 +309,12 @@ public class TcpClientCache implements ClientCache { if (keys.isEmpty()) return new HashMap<>(); - return ch.service(ClientOperation.CACHE_GET_ALL, req -> writeKeys(keys, req), this::readEntries); + TcpClientTransaction tx = transactions.tx(); + + return txAwareService(null, tx, + ClientOperation.CACHE_GET_ALL, + req -> writeKeys(keys, req, tx), + this::readEntries); } /** {@inheritDoc} */ @@ -314,7 +325,13 @@ public class TcpClientCache implements ClientCache { if (keys.isEmpty()) return IgniteClientFutureImpl.completedFuture(new HashMap<>()); - return ch.serviceAsync(ClientOperation.CACHE_GET_ALL, req -> writeKeys(keys, req), this::readEntries); + TcpClientTransaction tx = transactions.tx(); + + return txAwareServiceAsync(null, tx, + ClientOperation.CACHE_GET_ALL, + req -> writeKeys(keys, req, tx), + this::readEntries); + } /** {@inheritDoc} */ @@ -325,12 +342,28 @@ public class TcpClientCache implements ClientCache { if (map.isEmpty()) return; - ch.request(ClientOperation.CACHE_PUT_ALL, req -> writeEntries(map, req)); + TcpClientTransaction tx = transactions.tx(); + + txAwareService(null, tx, + ClientOperation.CACHE_PUT_ALL, + req -> writeEntries(map, req, tx), + null); } /** {@inheritDoc} */ @Override public IgniteClientFuture putAllAsync(Map map) throws ClientException { - return ch.requestAsync(ClientOperation.CACHE_PUT_ALL, req -> writeEntries(map, req)); + if (map == null) + throw new NullPointerException("map"); + + if (map.isEmpty()) + return IgniteClientFutureImpl.completedFuture(null); + + TcpClientTransaction tx = transactions.tx(); + + return txAwareServiceAsync(null, tx, + ClientOperation.CACHE_PUT_ALL, + req -> writeEntries(map, req, tx), + null); } /** {@inheritDoc} */ @@ -475,11 +508,14 @@ public class TcpClientCache implements ClientCache { if (keys.isEmpty()) return; - ch.request( + TcpClientTransaction tx = transactions.tx(); + + txAwareService(null, tx, ClientOperation.CACHE_REMOVE_KEYS, req -> { - writeKeys(keys, req); - } + writeKeys(keys, req, tx); + }, + null ); } @@ -491,11 +527,14 @@ public class TcpClientCache implements ClientCache { if (keys.isEmpty()) return IgniteClientFutureImpl.completedFuture(null); - return ch.requestAsync( - ClientOperation.CACHE_REMOVE_KEYS, - req -> { - writeKeys(keys, req); - } + TcpClientTransaction tx = transactions.tx(); + + return txAwareServiceAsync(null, tx, + ClientOperation.CACHE_REMOVE_KEYS, + req -> { + writeKeys(keys, req, tx); + }, + null ); } @@ -707,9 +746,12 @@ public class TcpClientCache implements ClientCache { if (keys.isEmpty()) return; - ch.request( + TcpClientTransaction tx = transactions.tx(); + + txAwareService(null, tx, ClientOperation.CACHE_CLEAR_KEYS, - req -> writeKeys(keys, req) + req -> writeKeys(keys, req, tx), + null ); } @@ -721,9 +763,12 @@ public class TcpClientCache implements ClientCache { if (keys.isEmpty()) return IgniteClientFutureImpl.completedFuture(null); - return ch.requestAsync( + TcpClientTransaction tx = transactions.tx(); + + return txAwareServiceAsync(null, tx, ClientOperation.CACHE_CLEAR_KEYS, - req -> writeKeys(keys, req) + req -> writeKeys(keys, req, tx), + null ); } @@ -1062,6 +1107,68 @@ private QueryCursor> sqlQuery(SqlQuery qry) { )); } + /** + * Execute operation on channel most suitable for transactional context. + */ + private T txAwareService( + @Nullable K affKey, + TcpClientTransaction tx, + ClientOperation op, + Consumer payloadWriter, + Function payloadReader + ) { + // Transactional operation cannot be executed on affinity node, it should be executed on node started + // the transaction. + if (tx != null) { + try { + return tx.clientChannel().service(op, payloadWriter, payloadReader); + } + catch (ClientConnectionException e) { + throw new ClientException("Transaction context has been lost due to connection errors. " + + "Cache operations are prohibited until current transaction closed.", e); + } + } + else if (affKey != null) + return ch.affinityService(cacheId, affKey, op, payloadWriter, payloadReader); + else + return ch.service(op, payloadWriter, payloadReader); + } + + /** + * Execute operation on channel most suitable for transactional context. + */ + private IgniteClientFuture txAwareServiceAsync( + @Nullable K affKey, + TcpClientTransaction tx, + ClientOperation op, + Consumer payloadWriter, + Function payloadReader + ) { + // Transactional operation cannot be executed on affinity node, it should be executed on node started + // the transaction. + if (tx != null) { + CompletableFuture fut = new CompletableFuture<>(); + + tx.clientChannel().serviceAsync(op, payloadWriter, payloadReader).whenComplete((res, err) -> { + if (err instanceof ClientConnectionException) { + fut.completeExceptionally( + new ClientException("Transaction context has been lost due to connection errors. " + + "Cache operations are prohibited until current transaction closed.", err)); + } + else if (err != null) + fut.completeExceptionally(err); + else + fut.complete(res); + }); + + return new IgniteClientFutureImpl<>(fut); + } + else if (affKey != null) + return ch.affinityServiceAsync(cacheId, affKey, op, payloadWriter, payloadReader); + else + return ch.serviceAsync(op, payloadWriter, payloadReader); + } + /** * Execute cache operation with a single key. */ @@ -1071,18 +1178,17 @@ private T cacheSingleKeyOperation( Consumer additionalPayloadWriter, Function payloadReader ) throws ClientException { + TcpClientTransaction tx = transactions.tx(); + Consumer payloadWriter = req -> { - writeCacheInfo(req); + writeCacheInfo(req, tx); writeObject(req, key); if (additionalPayloadWriter != null) additionalPayloadWriter.accept(req); }; - // Transactional operation cannot be executed on affinity node, it should be executed on node started - // the transaction. - return transactions.tx() == null ? ch.affinityService(cacheId, key, op, payloadWriter, payloadReader) : - ch.service(op, payloadWriter, payloadReader); + return txAwareService(key, tx, op, payloadWriter, payloadReader); } /** @@ -1094,31 +1200,32 @@ private IgniteClientFuture cacheSingleKeyOperationAsync( Consumer additionalPayloadWriter, Function payloadReader ) throws ClientException { + TcpClientTransaction tx = transactions.tx(); + Consumer payloadWriter = req -> { - writeCacheInfo(req); + writeCacheInfo(req, tx); writeObject(req, key); if (additionalPayloadWriter != null) additionalPayloadWriter.accept(req); }; - // Transactional operation cannot be executed on affinity node, it should be executed on node started - // the transaction. - return transactions.tx() == null - ? ch.affinityServiceAsync(cacheId, key, op, payloadWriter, payloadReader) - : ch.serviceAsync(op, payloadWriter, payloadReader); + return txAwareServiceAsync(key, tx, op, payloadWriter, payloadReader); } - /** Write cache ID and flags. */ + /** Write cache ID and flags for non-transactional operations. */ private void writeCacheInfo(PayloadOutputChannel payloadCh) { + writeCacheInfo(payloadCh, null); + } + + /** Write cache ID and flags. */ + private void writeCacheInfo(PayloadOutputChannel payloadCh, TcpClientTransaction tx) { BinaryOutputStream out = payloadCh.out(); out.writeInt(cacheId); byte flags = keepBinary ? KEEP_BINARY_FLAG_MASK : 0; - TcpClientTransaction tx = transactions.tx(); - if (expiryPlc != null) { ProtocolContext protocolCtx = payloadCh.clientChannel().protocolCtx(); @@ -1130,14 +1237,8 @@ private void writeCacheInfo(PayloadOutputChannel payloadCh) { flags |= WITH_EXPIRY_POLICY_FLAG_MASK; } - if (tx != null) { - if (tx.clientChannel() != payloadCh.clientChannel()) { - throw new ClientException("Transaction context has been lost due to connection errors. " + - "Cache operations are prohibited until current transaction closed."); - } - + if (tx != null) flags |= TRANSACTIONAL_FLAG_MASK; - } out.writeByte(flags); @@ -1177,8 +1278,8 @@ private void writeObject(PayloadOutputChannel payloadCh, Object obj) { } /** */ - private void writeKeys(Set keys, PayloadOutputChannel req) { - writeCacheInfo(req); + private void writeKeys(Set keys, PayloadOutputChannel req, TcpClientTransaction tx) { + writeCacheInfo(req, tx); ClientUtils.collection(keys, req.out(), serDes::writeObject); } @@ -1196,8 +1297,8 @@ private Map readEntries(PayloadInputChannel res) { } /** */ - private void writeEntries(Map map, PayloadOutputChannel req) { - writeCacheInfo(req); + private void writeEntries(Map map, PayloadOutputChannel req, TcpClientTransaction tx) { + writeCacheInfo(req, tx); ClientUtils.collection( map.entrySet(), req.out(), diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java index 3552968c37050..1b72dab3674af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.client.ClientConnectionException; import org.apache.ignite.client.ClientException; import org.apache.ignite.client.ClientTransaction; import org.apache.ignite.client.ClientTransactions; @@ -238,15 +239,15 @@ private TcpClientTransaction(int id, ClientChannel clientCh) { */ private void endTx(boolean committed) { try { - ch.service(ClientOperation.TX_END, + clientCh.service(ClientOperation.TX_END, req -> { - if (clientCh != req.clientChannel()) - throw new ClientException("Transaction context has been lost due to connection errors"); - req.out().writeInt(txId); req.out().writeBoolean(committed); }, null); } + catch (ClientConnectionException e) { + throw new ClientException("Transaction context has been lost due to connection errors", e); + } finally { txMap.remove(txUid); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 619a84ef0fe43..1c07f5e279ce2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -3301,6 +3301,11 @@ public static long totalSize(FileDescriptor... fileDescriptors) { return len; } + /** @return WAL cdc directory (including consistent ID as subfolder) */ + @Nullable public File walCdcDirectory() { + return walCdcDir; + } + /** * Check if WAL archive is unlimited. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java new file mode 100644 index 0000000000000..b63e18d757ae9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.cdc; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.internal.cdc.CdcFileLockHolder; +import org.apache.ignite.internal.cdc.CdcMain; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.visor.VisorJob; +import org.apache.ignite.internal.visor.VisorMultiNodeTask; +import org.apache.ignite.resources.LoggerResource; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.cdc.CdcConsumerState.WAL_STATE_FILE_NAME; +import static org.apache.ignite.internal.cdc.CdcMain.STATE_DIR; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER; + +/** + * Task to delete lost segment CDC links. + */ +@GridInternal +public class VisorCdcDeleteLostSegmentsTask extends VisorMultiNodeTask { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override protected VisorJob job(Void arg) { + return new VisorCdcDeleteLostSegmentsJob(arg, false); + } + + /** {@inheritDoc} */ + @Override protected @Nullable Void reduce0(List results) throws IgniteException { + for (ComputeJobResult res : results) { + if (res.getException() != null) { + throw new IgniteException("Failed to delete lost segment CDC links on a node " + + "[nodeId=" + res.getNode().id() + ']', res.getException()); + } + } + + return null; + } + + /** */ + private static class VisorCdcDeleteLostSegmentsJob extends VisorJob { + /** */ + private static final long serialVersionUID = 0L; + + /** Injected logger. */ + @LoggerResource + protected IgniteLogger log; + + /** + * Create job with specified argument. + * + * @param arg Job argument. + * @param debug Flag indicating whether debug information should be printed into node log. + */ + protected VisorCdcDeleteLostSegmentsJob(Void arg, boolean debug) { + super(arg, debug); + } + + /** {@inheritDoc} */ + @Override protected Void run(Void arg) throws IgniteException { + FileWriteAheadLogManager wal = (FileWriteAheadLogManager)ignite.context().cache().context().wal(true); + + File walCdcDir = wal.walCdcDirectory(); + + if (walCdcDir == null) + throw new IgniteException("CDC is not configured."); + + CdcFileLockHolder lock = new CdcFileLockHolder(walCdcDir.getAbsolutePath(), "Delete lost segments job", log); + + try { + lock.tryLock(1); + + try (Stream cdcFiles = Files.list(walCdcDir.toPath())) { + Set delete = new HashSet<>(); + + AtomicLong lastSgmnt = new AtomicLong(-1); + + cdcFiles + .filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile())) + .sorted(Comparator.comparingLong(CdcMain::segmentIndex) + .reversed()) // Sort by segment index. + .forEach(path -> { + long idx = CdcMain.segmentIndex(path); + + if (lastSgmnt.get() == -1 || lastSgmnt.get() - idx == 1) { + lastSgmnt.set(idx); + + return; + } + + delete.add(path.toFile()); + }); + + if (delete.isEmpty()) { + log.info("Lost segment CDC links were not found."); + + return null; + } + + log.info("Found lost segment CDC links. The following links will be deleted: " + delete); + + delete.forEach(file -> { + if (!file.delete()) { + throw new IgniteException("Failed to delete lost segment CDC link [file=" + + file.getAbsolutePath() + ']'); + } + + log.info("Segment CDC link deleted [file=" + file.getAbsolutePath() + ']'); + }); + + Path stateDir = walCdcDir.toPath().resolve(STATE_DIR); + + if (stateDir.toFile().exists()) { + File walState = stateDir.resolve(WAL_STATE_FILE_NAME).toFile(); + + if (walState.exists() && !walState.delete()) { + throw new IgniteException("Failed to delete wal state file [file=" + + walState.getAbsolutePath() + ']'); + } + } + } + catch (IOException e) { + throw new RuntimeException("Failed to delete lost segment CDC links.", e); + } + } + catch (IgniteCheckedException e) { + throw new RuntimeException("Failed to delete lost segment CDC links. " + + "Unable to acquire lock to lock CDC folder. Make sure a CDC app is shut down " + + "[dir=" + walCdcDir.getAbsolutePath() + ", reason=" + e.getMessage() + ']'); + } + finally { + U.closeQuiet(lock); + } + + return null; + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java index 6e748c2eb9363..8d7ac46db2885 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java @@ -62,6 +62,8 @@ import org.apache.ignite.internal.util.lang.RunnableX; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.visor.VisorTaskArgument; +import org.apache.ignite.internal.visor.cdc.VisorCdcDeleteLostSegmentsTask; import org.apache.ignite.testframework.junits.WithSystemProperty; import org.junit.Test; import org.junit.runner.RunWith; @@ -788,6 +790,20 @@ public void testCdcDirectoryMaxSize() throws Exception { assertThrows(log, () -> fut.get(getTestTimeout()), IgniteCheckedException.class, "Found missed segments. Some events are missed."); + + ign.compute().execute(VisorCdcDeleteLostSegmentsTask.class, new VisorTaskArgument<>(ign.localNode().id(), false)); + + cnsmr.data.clear(); + + cdc = createCdc(cnsmr, getConfiguration(ign.name())); + + IgniteInternalFuture f = runAsync(cdc); + + waitForSize(1, DEFAULT_CACHE_NAME, UPDATE, cnsmr); + + assertFalse(f.isDone()); + + f.cancel(); } /** */ diff --git a/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/FunctionalTest.java similarity index 96% rename from modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java rename to modules/core/src/test/java/org/apache/ignite/internal/client/thin/FunctionalTest.java index 792a9980e30e4..c278fc5f3d853 100644 --- a/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/FunctionalTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.client; +package org.apache.ignite.internal.client.thin; import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; @@ -57,6 +57,16 @@ import org.apache.ignite.cache.PartitionLossPolicy; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.client.ClientCache; +import org.apache.ignite.client.ClientCacheConfiguration; +import org.apache.ignite.client.ClientConnectionException; +import org.apache.ignite.client.ClientException; +import org.apache.ignite.client.ClientTransaction; +import org.apache.ignite.client.Comparers; +import org.apache.ignite.client.Config; +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.client.LocalIgniteCluster; +import org.apache.ignite.client.Person; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.ClientConfiguration; import org.apache.ignite.configuration.ClientConnectorConfiguration; @@ -66,15 +76,12 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.AbstractBinaryArraysTest; -import org.apache.ignite.internal.client.thin.ClientServerError; import org.apache.ignite.internal.processors.cache.CacheEnumOperationsAbstractTest.TestEnum; -import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor; import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy; import org.apache.ignite.internal.processors.platform.client.ClientStatus; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.mxbean.ClientProcessorMXBean; import org.apache.ignite.spi.systemview.view.SystemView; import org.apache.ignite.spi.systemview.view.TransactionView; import org.apache.ignite.testframework.GridTestUtils; @@ -828,8 +835,11 @@ public void testTxResumeAfterTxTimeout() throws Exception { */ @Test public void testTransactions() throws Exception { - try (Ignite ignite = Ignition.start(Config.getServerConfiguration()); - IgniteClient client = Ignition.startClient(getClientConfiguration()) + int clusterSize = 3; + + try (LocalIgniteCluster cluster = LocalIgniteCluster.start(clusterSize); + IgniteClient client = Ignition.startClient(getClientConfiguration() + .setAddresses(cluster.clientAddresses().toArray(new String[clusterSize]))) ) { ClientCache cache = client.createCache(new ClientCacheConfiguration() .setName("cache") @@ -937,13 +947,10 @@ public void testTransactions() throws Exception { cache.put(1, "value5"); // Test failover. - ClientProcessorMXBean mxBean = getMxBean(ignite.name(), "Clients", - ClientListenerProcessor.class, ClientProcessorMXBean.class); - try (ClientTransaction tx = client.transactions().txStart()) { cache.put(1, "value6"); - mxBean.dropAllConnections(); + ((TcpClientTransactions.TcpClientTransaction)tx).clientChannel().close(); try { cache.put(1, "value7"); @@ -1157,9 +1164,35 @@ public void testTransactions() throws Exception { assertEquals("value23", cache.get(0)); } + // Test that new transaction can be started after commit of the previous one without closing. + ClientTransaction tx = client.transactions().txStart(); + tx.commit(); + + tx = client.transactions().txStart(); + tx.rollback(); + + // Test that new transaction can be started after rollback of the previous one without closing. + tx = client.transactions().txStart(); + tx.commit(); + + // Test that implicit transaction started after commit of previous one without closing. + cache.put(0, "value24"); + + GridTestUtils.runAsync(() -> assertEquals("value24", cache.get(0))).get(); + } + } + + /** + * Test transactions. + */ + @Test + public void testTransactionsLimit() throws Exception { + try (IgniteEx ignite = (IgniteEx)Ignition.start(Config.getServerConfiguration()); + IgniteClient client = Ignition.startClient(getClientConfiguration()) + ) { // Test active transactions limit. - int txLimit = ignite.configuration().getClientConnectorConfiguration().getThinClientConfiguration() - .getMaxActiveTxPerConnection(); + int txLimit = ignite.configuration().getClientConnectorConfiguration() + .getThinClientConfiguration().getMaxActiveTxPerConnection(); List txs = new ArrayList<>(txLimit); @@ -1181,22 +1214,6 @@ public void testTransactions() throws Exception { for (ClientTransaction tx : txs) tx.close(); - - // Test that new transaction can be started after commit of the previous one without closing. - ClientTransaction tx = client.transactions().txStart(); - tx.commit(); - - tx = client.transactions().txStart(); - tx.rollback(); - - // Test that new transaction can be started after rollback of the previous one without closing. - tx = client.transactions().txStart(); - tx.commit(); - - // Test that implicit transaction started after commit of previous one without closing. - cache.put(0, "value24"); - - GridTestUtils.runAsync(() -> assertEquals("value24", cache.get(0))).get(); } } @@ -1205,8 +1222,11 @@ public void testTransactions() throws Exception { */ @Test public void testTransactionsAsync() throws Exception { - try (Ignite ignored = Ignition.start(Config.getServerConfiguration()); - IgniteClient client = Ignition.startClient(getClientConfiguration()) + int clusterSize = 3; + + try (LocalIgniteCluster cluster = LocalIgniteCluster.start(clusterSize); + IgniteClient client = Ignition.startClient(getClientConfiguration() + .setAddresses(cluster.clientAddresses().toArray(new String[clusterSize]))) ) { ClientCache cache = client.createCache(new ClientCacheConfiguration() .setName("cache") @@ -1240,6 +1260,21 @@ public void testTransactionsAsync() throws Exception { } assertEquals("value2", cache.get(1)); + + // Test multi-key operations. + try (ClientTransaction tx = client.transactions().txStart()) { + cache.putAllAsync(F.asMap(1, "value3", 2, "value3")).get(); + + assertEquals(F.asMap(1, "value3", 2, "value3"), + cache.getAllAsync(new HashSet<>(F.asList(1, 2))).get()); + + cache.removeAllAsync(new HashSet<>(F.asList(1, 2))).get(); + + assertFalse(cache.containsKeysAsync(new HashSet<>(F.asList(1, 2))).get()); + } + + assertEquals("value2", cache.get(1)); + assertFalse(cache.containsKey(2)); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java index 713979a5303b0..77038a09a7fd9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java @@ -20,9 +20,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Queue; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; @@ -77,7 +79,7 @@ public void testDuplicatedAddressesAreValid() { /** * Checks that in case if address specified without port, the default port will be processed first - * */ + */ @Test public void testAddressWithoutPort() { ClientConfiguration ccfg = new ClientConfiguration().setAddresses("127.0.0.1"); @@ -88,28 +90,38 @@ public void testAddressWithoutPort() { assertEquals(ClientConnectorConfiguration.DFLT_PORT_RANGE + 1, rc.getChannelHolders().size()); - assertEquals(ClientConnectorConfiguration.DFLT_PORT, rc.getChannelHolders().iterator().next().getAddress().getPort()); + assertEquals(ClientConnectorConfiguration.DFLT_PORT, F.first(rc.getChannelHolders()).getAddress().getPort()); + + assertEquals(0, rc.getCurrentChannelIndex()); } /** - * Checks that ReliableChannel provides channels in the same order as in ClientConfiguration. - * */ + * Checks that ReliableChannel chooses random address as default from the set of addresses with the same (minimal) port. + */ @Test - public void testAddressesOrder() { - String[] addrs = new String[] {"127.0.0.1:10803", "127.0.0.1:10802", "127.0.0.1:10801", "127.0.0.1:10800"}; + public void testDefaultChannelBalancing() { + assertEquals(new HashSet<>(F.asList("127.0.0.2:10800", "127.0.0.3:10800", "127.0.0.4:10800")), + usedDefaultChannels("127.0.0.1:10801..10809", "127.0.0.2", "127.0.0.3:10800", "127.0.0.4:10800..10809")); + + assertEquals(new HashSet<>(F.asList("127.0.0.1:10800", "127.0.0.2:10800", "127.0.0.3:10800", "127.0.0.4:10800")), + usedDefaultChannels("127.0.0.1:10800", "127.0.0.2:10800", "127.0.0.3:10800", "127.0.0.4:10800")); + } + /** */ + private Set usedDefaultChannels(String... addrs) { ClientConfiguration ccfg = new ClientConfiguration().setAddresses(addrs); - ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null); + Set usedChannels = new HashSet<>(); - rc.channelsInit(); + for (int i = 0; i < 100; i++) { + ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null); - List holders = rc.getChannelHolders(); + rc.channelsInit(); - assertEquals(addrs.length, holders.size()); + usedChannels.add(rc.getChannelHolders().get(rc.getCurrentChannelIndex()).getAddress().toString()); + } - for (int i = 0; i < addrs.length; i++) - assertEquals(addrs[i], holders.get(i).getAddress().toString()); + return usedChannels; } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java index ba2624aeef3e8..9ffaa31a84a0c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.configuration.ClientConnectorConfiguration.DFLT_PORT; @@ -83,9 +84,6 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo /** Operations queue. */ protected final Queue> opsQueue = new ConcurrentLinkedQueue<>(); - /** Default channel. */ - protected TestTcpClientChannel dfltCh; - /** Client instance. */ protected IgniteClient client; @@ -143,7 +141,7 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo /** * Checks that operation goes through specified channel. */ - protected void assertOpOnChannel(TestTcpClientChannel expCh, ClientOperation expOp) { + protected void assertOpOnChannel(@Nullable TestTcpClientChannel expCh, ClientOperation expOp) { T2 nextChOp = opsQueue.poll(); assertNotNull("Unexpected (null) next operation [expCh=" + expCh + ", expOp=" + expOp + ']', nextChOp); @@ -151,8 +149,10 @@ protected void assertOpOnChannel(TestTcpClientChannel expCh, ClientOperation exp assertEquals("Unexpected operation on channel [expCh=" + expCh + ", expOp=" + expOp + ", nextOpCh=" + nextChOp + ']', expOp, nextChOp.get2()); - assertEquals("Unexpected channel for operation [expCh=" + expCh + ", expOp=" + expOp + - ", nextOpCh=" + nextChOp + ']', expCh, nextChOp.get1()); + if (expCh != null) { + assertEquals("Unexpected channel for operation [expCh=" + expCh + ", expOp=" + expOp + + ", nextOpCh=" + nextChOp + ']', expCh, nextChOp.get1()); + } } /** @@ -175,7 +175,7 @@ protected TestTcpClientChannel nodeChannel(UUID nodeId) { return channels[i]; } - return dfltCh; + return null; } /** @@ -212,25 +212,19 @@ protected void initClient(ClientConfiguration clientCfg, int... chIdxs) throws I awaitChannelsInit(chIdxs); - initDefaultChannel(); + opsQueue.clear(); } /** - * + * Trigger client to detect topology change. */ - protected void initDefaultChannel() { + protected void detectTopologyChange() { opsQueue.clear(); - // Send non-affinity request to determine default channel. + // Send non-affinity request to detect topology change. client.getOrCreateCache(REPL_CACHE_NAME); - T2 nextChOp = opsQueue.poll(); - - assertNotNull(nextChOp); - - assertEquals(nextChOp.get2(), ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME); - - dfltCh = nextChOp.get1(); + assertOpOnChannel(null, ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessBalancingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessBalancingTest.java new file mode 100644 index 0000000000000..f89dffe8c3f97 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessBalancingTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.thin; + +import java.util.BitSet; +import java.util.List; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; +import org.junit.Test; + +/** + * Test requests to connections distribution by thin client. + */ +public class ThinClientPartitionAwarenessBalancingTest extends ThinClientAbstractPartitionAwarenessTest { + /** */ + @Test + public void testConnectionDistribution() throws Exception { + startGrids(3); + + initClient(getClientConfiguration(0, 1, 2, 3), 0, 1, 2); + + BitSet usedConnections = new BitSet(); + + for (int i = 0; i < 100; i++) + client.cacheNames(); // Non-affinity requests should be randomly distributed among connections. + + List channelList = F.asList(channels); + + while (!opsQueue.isEmpty()) { + T2 op = opsQueue.poll(); + + usedConnections.set(channelList.indexOf(op.get1())); + } + + assertEquals(BitSet.valueOf(new byte[] {7}), usedConnections); // 7 = set of {0, 1, 2} + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java index d7611842ab89b..dff4d8ea11876 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java @@ -123,12 +123,15 @@ private void testPartitionAwareness(int... chIdxs) { clientCache.put(i, i); if (i == 0) - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS); + assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS); assertOpOnChannel(opCh, ClientOperation.CACHE_PUT); - assertTrue(channelHits.containsKey(opCh)); - channelHits.compute(opCh, (c, old) -> true); + if (opCh != null) { + assertTrue(channelHits.containsKey(opCh)); + + channelHits.compute(opCh, (c, old) -> true); + } } assertFalse(channelHits.containsValue(false)); @@ -156,12 +159,4 @@ private ClientConfiguration getClientConfigurationWithDiscovery(int... excludeId .setAddressesFinder(addrFinder) .setPartitionAwarenessEnabled(true); } - - /** - * Trigger client to detect topology change. - */ - private void detectTopologyChange() { - // Send non-affinity request to detect topology change. - initDefaultChannel(); - } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java index 279d31865cf29..6de86591c824b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java @@ -88,7 +88,7 @@ public void testResourcesReleasedAfterCacheDestroyed() throws Exception { clientCache.put(0, 0); TestTcpClientChannel opCh = affinityChannel(0, gridCache); - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS); + assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS); assertOpOnChannel(opCh, ClientOperation.CACHE_PUT); for (int i = 1; i < KEY_CNT; i++) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java index 5556da02ee50d..b94062b098829 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java @@ -148,8 +148,8 @@ public void testPartitionedCacheUnknownNode() throws IgniteCheckedException { clientCache.put(keyForUnknownNode, 0); - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS); - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT); + assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS); + assertOpOnChannel(null, ClientOperation.CACHE_PUT); } /** @@ -191,18 +191,14 @@ public void testScanQuery() throws IgniteCheckedException { for (int i = 0; i < GRIDS_CNT; i++) { int part = grid(i).affinity(PART_CACHE_NAME).primaryPartitions(grid(i).localNode())[0]; - // Client doesn't have connection with grid(0). - TestTcpClientChannel ch = i == 0 ? dfltCh : nodeChannel(grid(i).localNode().id()); + TestTcpClientChannel ch = nodeChannel(grid(i).localNode().id()); // Test scan query with specified partition. clientCache.query(new ScanQuery<>().setPartition(part)).getAll(); + // Client doesn't have connection with grid(0), ch will be null for this grid + // and operation on any channel is acceptable. assertOpOnChannel(ch, ClientOperation.QUERY_SCAN); - - // Test scan query without specified partition. - clientCache.query(new ScanQuery<>()).getAll(); - - assertOpOnChannel(dfltCh, ClientOperation.QUERY_SCAN); } } @@ -345,17 +341,17 @@ private void testNotApplicableCache(String cacheName) { // After first response we should send partitions request on default channel together with next request. cache.put(0, 0); - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS); - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT); + assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS); + assertOpOnChannel(null, ClientOperation.CACHE_PUT); for (int i = 1; i < KEY_CNT; i++) { cache.put(i, i); - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT); + assertOpOnChannel(null, ClientOperation.CACHE_PUT); cache.get(i); - assertOpOnChannel(dfltCh, ClientOperation.CACHE_GET); + assertOpOnChannel(null, ClientOperation.CACHE_GET); } } @@ -371,9 +367,7 @@ private void testApplicableCache(String cacheName, Function key TestTcpClientChannel opCh = affinityChannel(keyFactory.apply(0), igniteCache); - // Default channel is the first who detects topology change, so next partition request will go through - // the default channel. - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS); + assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS); assertOpOnChannel(opCh, ClientOperation.CACHE_PUT); for (int i = 1; i < KEY_CNT; i++) { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java index e167ab1652c03..dfe93a97c6906 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java @@ -60,7 +60,7 @@ public void testPartitionAwarenessOnNodeJoin() throws Exception { awaitChannelsInit(3); - assertOpOnChannel(dfltCh, ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME); + assertOpOnChannel(null, ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME); Integer key = primaryKey(grid(3).cache(PART_CACHE_NAME)); @@ -68,9 +68,7 @@ public void testPartitionAwarenessOnNodeJoin() throws Exception { cache.put(key, 0); - // Cache partitions are requested from default channel, since it's first (and currently the only) channel - // which detects new topology. - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS); + assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS); assertOpOnChannel(channels[3], ClientOperation.CACHE_PUT); @@ -98,8 +96,8 @@ public void testPartitionAwarenessOnNodeLeft() throws Exception { awaitPartitionMapExchange(); - // Next request will also detect topology change. - initDefaultChannel(); + // Detect topology change. + detectTopologyChange(); // Test partition awareness after node join. testPartitionAwareness(true); @@ -119,8 +117,8 @@ public void testConnectionLoss() throws Exception { // Test partition awareness before connection to node lost. testPartitionAwareness(true); - // Choose node to disconnect (not default node). - int disconnectNodeIdx = dfltCh == channels[0] ? 1 : 0; + // Choose node to disconnect. + int disconnectNodeIdx = 0; // Drop all thin connections from the node. getMxBean(grid(disconnectNodeIdx).name(), "Clients", @@ -137,8 +135,8 @@ public void testConnectionLoss() throws Exception { cache.put(key, 0); - // Request goes to default channel, since affinity node is disconnected. - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT); + // Request goes to the connected channel, since affinity node is disconnected. + assertOpOnChannel(channels[1], ClientOperation.CACHE_PUT); cache.put(key, 0); @@ -175,7 +173,7 @@ public void testPartitionAwarenessOnClusterRestart() throws Exception { // Send any request to failover. client.cache(REPL_CACHE_NAME).put(0, 0); - initDefaultChannel(); + detectTopologyChange(); awaitChannelsInit(0, 1); @@ -197,7 +195,7 @@ private void testPartitionAwareness(boolean partReq) { clientCache.put(i, i); if (partReq) { - assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS); + assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS); partReq = false; } diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output index 08f1081d34c8c..0f27ca2680b41 100644 --- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output +++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output @@ -343,6 +343,13 @@ If the file name isn't specified the output file name is: '.bin' Finalize partitions update counters: control.(sh|bat) --consistency finalize + [EXPERIMENTAL] + Delete lost segment CDC links: + control.(sh|bat) --cdc delete_lost_segment_links [--node-id node_id] [--yes] + + Parameters: + node_id - ID of the node to delete lost segment links from. If not set, the command will affect all server nodes. + By default commands affecting the cluster require interactive confirmation. Use --yes option to disable it. diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output index 08f1081d34c8c..0f27ca2680b41 100644 --- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output +++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output @@ -343,6 +343,13 @@ If the file name isn't specified the output file name is: '.bin' Finalize partitions update counters: control.(sh|bat) --consistency finalize + [EXPERIMENTAL] + Delete lost segment CDC links: + control.(sh|bat) --cdc delete_lost_segment_links [--node-id node_id] [--yes] + + Parameters: + node_id - ID of the node to delete lost segment links from. If not set, the command will affect all server nodes. + By default commands affecting the cluster require interactive confirmation. Use --yes option to disable it. diff --git a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java index 4a97cd5d1ee22..83b6f0fa86d11 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java @@ -24,12 +24,14 @@ import org.apache.ignite.internal.client.thin.ClusterGroupTest; import org.apache.ignite.internal.client.thin.ComputeTaskTest; import org.apache.ignite.internal.client.thin.DataReplicationOperationsTest; +import org.apache.ignite.internal.client.thin.FunctionalTest; import org.apache.ignite.internal.client.thin.IgniteSetTest; import org.apache.ignite.internal.client.thin.MetadataRegistrationTest; import org.apache.ignite.internal.client.thin.OptimizedMarshallerClassesCachedTest; import org.apache.ignite.internal.client.thin.ReliableChannelTest; import org.apache.ignite.internal.client.thin.ServicesBinaryArraysTests; import org.apache.ignite.internal.client.thin.ServicesTest; +import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessBalancingTest; import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessDiscoveryTest; import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessResourceReleaseTest; import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessStableTopologyTest; @@ -69,6 +71,7 @@ ThinClientPartitionAwarenessUnstableTopologyTest.class, ThinClientPartitionAwarenessResourceReleaseTest.class, ThinClientPartitionAwarenessDiscoveryTest.class, + ThinClientPartitionAwarenessBalancingTest.class, ReliableChannelTest.class, CacheAsyncTest.class, TimeoutTest.class,