Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion docs/_docs/persistence/change-data-capture.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,36 @@ IMPORTANT: `ignite-cdc.sh` implements the fail-fast approach. It just fails in c
5. Infinitely wait for the newly available segment and process it.
6. Stop the consumer in case of a failure or a received stop signal.

== Handling skipped segments

The CDC can be disabled manually or by configured directory maximum size. In this case a hard link creation will be skipped.

WARNING: All changes in skipped segments will be lost!

So when enabled there will be gap between segments: `0000000000000002.wal`, `0000000000000010.wal`, `0000000000000011.wal`, for example.
In this case `ignite-cdc.sh` will fail with the something like "Found missed segments. Some events are missed. Exiting! [lastSegment=2, nextSegment=10]".

NOTE: Make sure you need to sync data before restarting the CDC application. You can synchronize caches using
snapshot or other methods.

To fix this error you can run the following link:tools/control-script[Control Script] command:

[source,shell]
----
# Delete lost segment CDC links in the cluster.
control.sh|bat --cdc delete_lost_segment_links

# Delete lost segment CDC links on a node.
control.sh|bat --cdc delete_lost_segment_links --node-id node_id
----

The command will remove all segment links before the last gap.

For example, CDC was turned off several times: `000000000000002.wal`, `000000000000003.wal`, `000000000000008.wal`, `0000000000000010.wal`, `0000000000000011.wal`
Then, after the command is executed, the following segment links will be deleted: `000000000000002.wal`, `000000000000003.wal`, `000000000000008.wal`.
The application will start from the `0000000000000010.wal` segment after being enabled.

== cdc-ext

Ignite extensions project has link:https://github.com/apache/ignite-extensions/tree/master/modules/cdc-ext[cdc-ext] module which provides two way to setup cross cluster replication based on CDC.
Detailed documentation can be found on link:extensions-and-integrations/change-data-capture-extensions[page].
Detailed documentation can be found on link:extensions-and-integrations/change-data-capture-extensions[page].
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public static SqlNodeList parse(String qry, SqlParser.Config parserCfg) {
return parse(new SourceStringReader(qry), parserCfg);
}
catch (SqlParseException e) {
throw new IgniteSQLException("Failed to parse query.", IgniteQueryErrorCode.PARSING, e);
throw new IgniteSQLException("Failed to parse query. " + e.getMessage(), IgniteQueryErrorCode.PARSING, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,21 @@ public void createTableIfNotExists() {
sql("create table if not exists my_table (id int, val varchar)");
}

/**
* Create table using reserved word
*/
@Test
public void createTableUseReservedWord() {
assertThrows("create table table (id int primary key, val varchar)", IgniteSQLException.class,
"Failed to parse query. Encountered \"table table\"");

sql("create table \"table\" (id int primary key, val varchar)");

sql("insert into \"table\" (id, val) values (0, '1')");

assertQuery("select * from \"table\" ").returns(0, "1").check();
}

/**
* Creates a table without a primary key and then insert a few rows.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -149,6 +150,20 @@ public int nextIntArg(String argName) {
}
}

/** @return UUID value. */
public UUID nextUuidArg(String argName) {
String str = nextArg("Expecting " + argName + " command argument.");

try {
return UUID.fromString(str);
}
catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Failed to parse " + argName + " command argument." +
" String representation of \"java.util.UUID\" is exepected. For example:" +
" 123e4567-e89b-42d3-a456-556642440000", e);
}
}

/**
* @param argName Name of argument.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.commandline;

import org.apache.ignite.internal.commandline.cache.CacheCommands;
import org.apache.ignite.internal.commandline.cdc.CdcCommand;
import org.apache.ignite.internal.commandline.consistency.ConsistencyCommand;
import org.apache.ignite.internal.commandline.diagnostic.DiagnosticCommand;
import org.apache.ignite.internal.commandline.encryption.EncryptionCommands;
Expand Down Expand Up @@ -103,7 +104,10 @@ public enum CommandList {
PERFORMANCE_STATISTICS("--performance-statistics", new PerformanceStatisticsCommand()),

/** Command to check/repair consistency. */
CONSISTENCY("--consistency", new ConsistencyCommand());
CONSISTENCY("--consistency", new ConsistencyCommand()),

/** Cdc commands. */
CDC("--cdc", new CdcCommand());

/** Private values copy so there's no need in cloning it every time. */
private static final CommandList[] VALUES = CommandList.values();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class CommonArgParser {
static final String CMD_USER = "--user";

/** Option is used for auto confirmation. */
static final String CMD_AUTO_CONFIRMATION = "--yes";
public static final String CMD_AUTO_CONFIRMATION = "--yes";

/** */
static final String CMD_PING_INTERVAL = "--ping-interval";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.commandline.cdc;

import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.client.GridClient;
import org.apache.ignite.internal.client.GridClientConfiguration;
import org.apache.ignite.internal.client.GridClientNode;
import org.apache.ignite.internal.commandline.AbstractCommand;
import org.apache.ignite.internal.commandline.Command;
import org.apache.ignite.internal.commandline.CommandArgIterator;
import org.apache.ignite.internal.commandline.CommandLogger;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.VisorTaskArgument;
import org.apache.ignite.internal.visor.cdc.VisorCdcDeleteLostSegmentsTask;

import static org.apache.ignite.internal.commandline.CommandList.CDC;
import static org.apache.ignite.internal.commandline.CommandLogger.optional;
import static org.apache.ignite.internal.commandline.CommonArgParser.CMD_AUTO_CONFIRMATION;
import static org.apache.ignite.internal.commandline.TaskExecutor.executeTaskByNameOnNode;

/**
* CDC command.
*/
public class CdcCommand extends AbstractCommand<String> {
/** Command to delete lost segment links. */
public static final String DELETE_LOST_SEGMENT_LINKS = "delete_lost_segment_links";

/** */
public static final String NODE_ID = "--node-id";

/** Node ID. */
private UUID nodeId;

/** {@inheritDoc} */
@Override public Object execute(GridClientConfiguration clientCfg, IgniteLogger log) throws Exception {
try (GridClient client = Command.startClient(clientCfg)) {
executeTaskByNameOnNode(
client,
VisorCdcDeleteLostSegmentsTask.class.getName(),
null,
nodeId,
clientCfg
);

Collection<UUID> nodeIds = nodeId != null ? Collections.singletonList(nodeId) :
client.compute().nodes(node -> !node.isClient()).stream().map(GridClientNode::nodeId)
.collect(Collectors.toSet());

client.compute().execute(VisorCdcDeleteLostSegmentsTask.class.getName(),
new VisorTaskArgument<>(nodeIds, false));

String res = "Lost segment CDC links successfully removed.";

log.info(res);

return res;
}
catch (Throwable e) {
log.error("Failed to perform operation.");
log.error(CommandLogger.errorMessage(e));

throw e;
}
}

/** {@inheritDoc} */
@Override public void parseArguments(CommandArgIterator argIter) {
nodeId = null;

String cmd = argIter.nextArg("Expected command: " + DELETE_LOST_SEGMENT_LINKS);

if (!DELETE_LOST_SEGMENT_LINKS.equalsIgnoreCase(cmd))
throw new IllegalArgumentException("Unexpected command: " + cmd);

while (argIter.hasNextSubArg()) {
String opt = argIter.nextArg("Failed to read command argument.");

if (NODE_ID.equalsIgnoreCase(opt))
nodeId = argIter.nextUuidArg(NODE_ID);
}
}

/** {@inheritDoc} */
@Override public String confirmationPrompt() {
return "Warning: The command will fix WAL segments gap in case CDC link creation was stopped by distributed " +
"property or excess of maximum CDC directory size. Gap will be fixed by deletion of WAL segment links" +
"previous to the last gap." + U.nl() +
"All changes in deleted segment links will be lost!" + U.nl() +
"Make sure you need to sync data before restarting the CDC application. You can synchronize caches " +
"using snapshot or other methods.";
}

/** {@inheritDoc} */
@Override public String arg() {
return null;
}

/** {@inheritDoc} */
@Override public void printUsage(IgniteLogger logger) {
Map<String, String> params = new LinkedHashMap<>();

params.put("node_id", "ID of the node to delete lost segment links from. If not set, the command will affect " +
"all server nodes.");

usage(logger, "Delete lost segment CDC links:", CDC, params, DELETE_LOST_SEGMENT_LINKS,
optional(NODE_ID, "node_id"), optional(CMD_AUTO_CONFIRMATION));
}

/** {@inheritDoc} */
@Override public String name() {
return "cdc";
}

/** {@inheritDoc} */
@Override public boolean experimental() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,19 +106,8 @@ else if (arg().bounds() == null && arg().rateTimeInterval() < 0)

MetricCommandArg cmdArg = CommandArgUtils.of(arg, MetricCommandArg.class);

if (cmdArg == NODE_ID) {
String nodeIdArg = argIter.nextArg(
"ID of the node from which metric values should be obtained is expected.");

try {
nodeId = UUID.fromString(nodeIdArg);
}
catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Failed to parse " + NODE_ID + " command argument." +
" String representation of \"java.util.UUID\" is exepected. For example:" +
" 123e4567-e89b-42d3-a456-556642440000", e);
}
}
if (cmdArg == NODE_ID)
nodeId = argIter.nextUuidArg(NODE_ID.argName());
else if (cmdArg == CONFIGURE_HISTOGRAM || cmdArg == CONFIGURE_HITRATE) {
if (metricName != null) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ENABLE_EXPERIMENTAL_COMMAND;
import static org.apache.ignite.internal.QueryMXBeanImpl.EXPECTED_GLOBAL_QRY_ID_FORMAT;
import static org.apache.ignite.internal.commandline.CommandList.CACHE;
import static org.apache.ignite.internal.commandline.CommandList.CDC;
import static org.apache.ignite.internal.commandline.CommandList.CLUSTER_CHANGE_TAG;
import static org.apache.ignite.internal.commandline.CommandList.SET_STATE;
import static org.apache.ignite.internal.commandline.CommandList.SHUTDOWN_POLICY;
Expand All @@ -78,6 +79,8 @@
import static org.apache.ignite.internal.commandline.cache.CacheSubcommands.VALIDATE_INDEXES;
import static org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_FIRST;
import static org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_THROUGH;
import static org.apache.ignite.internal.commandline.cdc.CdcCommand.DELETE_LOST_SEGMENT_LINKS;
import static org.apache.ignite.internal.commandline.cdc.CdcCommand.NODE_ID;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -412,6 +415,8 @@ else if (cmdL == CLUSTER_CHANGE_TAG)
args = parseArgs(asList(cmdL.text(), "newTagValue"));
else if (cmdL == WARM_UP)
args = parseArgs(asList(cmdL.text(), "--stop"));
else if (cmdL == CDC)
args = parseArgs(asList(cmdL.text(), DELETE_LOST_SEGMENT_LINKS, NODE_ID, UUID.randomUUID().toString()));
else
args = parseArgs(asList(cmdL.text()));

Expand Down Expand Up @@ -497,6 +502,15 @@ else if (cmdL == WARM_UP)
break;
}

case CDC: {
args = parseArgs(asList(cmdL.text(), DELETE_LOST_SEGMENT_LINKS,
NODE_ID, UUID.randomUUID().toString(), "--yes"));

checkCommonParametersCorrectlyParsed(cmdL, args, true);

break;
}

default:
fail("Unknown command: " + cmd);
}
Expand Down Expand Up @@ -1232,6 +1246,7 @@ private boolean requireArgs(@Nullable CommandList cmd) {
cmd == CommandList.METRIC ||
cmd == CommandList.DEFRAGMENTATION ||
cmd == CommandList.PERFORMANCE_STATISTICS ||
cmd == CommandList.CONSISTENCY;
cmd == CommandList.CONSISTENCY ||
cmd == CDC;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.ignite.internal.commandline.indexreader.IgniteIndexReaderTest;
import org.apache.ignite.util.CacheMetricsCommandTest;
import org.apache.ignite.util.CdcCommandTest;
import org.apache.ignite.util.GridCommandHandlerConsistencyBinaryTest;
import org.apache.ignite.util.GridCommandHandlerConsistencyCountersTest;
import org.apache.ignite.util.GridCommandHandlerConsistencyRepairCorrectnessAtomicTest;
Expand Down Expand Up @@ -64,7 +65,9 @@
PerformanceStatisticsCommandTest.class,
CacheMetricsCommandTest.class,

IgniteIndexReaderTest.class
IgniteIndexReaderTest.class,

CdcCommandTest.class
})
public class IgniteControlUtilityTestSuite2 {
}
Loading