Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.function.UnaryOperator;
Expand Down Expand Up @@ -91,6 +90,7 @@
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.db.IgniteCacheGroupsWithRestartsTest;
import org.apache.ignite.internal.processors.cache.persistence.diagnostic.pagelocktracker.dumpprocessors.ToFileDumpProcessor;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreStatusDetails;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
Expand Down Expand Up @@ -3214,29 +3214,31 @@ public void testSnapshotRestoreCancelAndStatus() throws Exception {
((SingleNodeMessage<?>)msg).type() == RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE.ordinal());

// Restore single cache group.
assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--start", DEFAULT_CACHE_NAME));
assertContains(log, testOut.toString(),
"Snapshot cache group restore operation started [snapshot=" + snpName + ", group(s)=" + DEFAULT_CACHE_NAME + ']');
ig.snapshot().restoreSnapshot(snpName, Collections.singleton(DEFAULT_CACHE_NAME));

assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--status"));
assertContains(log, testOut.toString(),
"Snapshot cache group restore operation is running [snapshot=" + snpName + ']');
"Restore operation for snapshot \"" + snpName + "\" is still in progress");

// Check wrong snapshot name.
assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", missingSnpName, "--status"));
assertContains(log, testOut.toString(),
"Snapshot cache group restore operation is NOT running [snapshot=" + missingSnpName + ']');
"No information about restoring snapshot \"" + missingSnpName + "\" is available.");

assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", missingSnpName, "--cancel"));
assertContains(log, testOut.toString(),
"Snapshot cache group restore operation is not in progress [snapshot=" + missingSnpName + ']');

GridTestUtils.runAsync(() -> {
IgniteInternalFuture<Object> fut = runAsync(() -> {
// Wait for the process to be interrupted.
AtomicReference<?> errRef = U.field((Object)U.field((Object)U.field(
grid(0).context().cache().context().snapshotMgr(), "restoreCacheGrpProc"), "opCtx"), "err");
boolean canceled = waitForCondition(() -> {
SnapshotRestoreStatusDetails status =
grid(0).context().cache().context().snapshotMgr().localRestoreStatus(snpName);

return status != null && status.errorMessage() != null;
}, getTestTimeout());

waitForCondition(() -> errRef.get() != null, getTestTimeout());
assertTrue(canceled);

spi.stopBlock();

Expand All @@ -3247,9 +3249,21 @@ public void testSnapshotRestoreCancelAndStatus() throws Exception {
assertContains(log, testOut.toString(),
"Snapshot cache group restore operation canceled [snapshot=" + snpName + ']');

fut.get(getTestTimeout());

assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--status"));
assertContains(log, testOut.toString(),
"Snapshot cache group restore operation is NOT running [snapshot=" + snpName + ']');
"Error: Operation has been canceled by the user.");

ig.snapshot().restoreSnapshot(snpName, null).get(getTestTimeout());

assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--status"));

String out = testOut.toString();

assertContains(log, out, "Restore operation for snapshot \"" + snpName + "\" completed successfully");
assertContains(log, out, "Cache groups: " + DEFAULT_CACHE_NAME);
assertContains(log, out, "Progress: 100% completed");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ public static String partDeltaFileName(int partId) {
"The list of names of all snapshots currently saved on the local node with respect to " +
"the configured via IgniteConfiguration snapshot working path.");

restoreCacheGrpProc.registerMetrics();

storeFactory = storeMgr::getPageStoreFactory;

cctx.exchange().registerExchangeAwareComponent(this);
Expand Down Expand Up @@ -884,16 +886,6 @@ public boolean isRestoring() {
return restoreCacheGrpProc.restoringSnapshotName() != null;
}

/**
* Check if snapshot restore process is currently running.
*
* @param snpName Snapshot name.
* @return {@code True} if the snapshot restore operation from the specified snapshot is in progress locally.
*/
public boolean isRestoring(String snpName) {
return snpName.equals(restoreCacheGrpProc.restoringSnapshotName());
}

/**
* Check if the cache or group with the specified name is currently being restored from the snapshot.
*
Expand All @@ -906,17 +898,27 @@ public boolean isRestoring(String cacheName, @Nullable String grpName) {
}

/**
* Status of the restore operation cluster-wide.
* Get the status of a cluster-wide restore operation.
*
* @param snpName Snapshot name.
* @return Future that will be completed when the status of the restore operation is received from all the server
* nodes. The result of this future will be {@code false} if the restore process with the specified snapshot name is
* not running on all nodes.
* @return Future that will be completed when the status of the restore operation is received from all server nodes.
* The result of this future is the node ids mapping with restore operation state.
*/
public IgniteFuture<Boolean> restoreStatus(String snpName) {
public IgniteFuture<Map<UUID, SnapshotRestoreStatusDetails>> clusterRestoreStatus(String snpName) {
return executeRestoreManagementTask(SnapshotRestoreStatusTask.class, snpName);
}

/**
* Get the status of the last local snapshot restore operation.
*
* @param snpName Snapshot name.
* @return Status of the last local snapshot restore operation, {@code null} if the snapshot name of the last
* started operation differs from the specified one.
*/
public @Nullable SnapshotRestoreStatusDetails localRestoreStatus(String snpName) {
return restoreCacheGrpProc.status(snpName);
}

/**
* @param restoreId Restore process ID.
* @return Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when
Expand Down Expand Up @@ -1734,9 +1736,11 @@ static void copy(FileIOFactory factory, File from, File to, long length) {
/**
* @param taskCls Snapshot restore operation management task class.
* @param snpName Snapshot name.
* @param <T> Type of the task result returning from {@link SnapshotRestoreManagementTask#reduce(List)} method.
* @return Task future.
*/
private IgniteFuture<Boolean> executeRestoreManagementTask(
Class<? extends ComputeTask<String, Boolean>> taskCls,
private <T> IgniteFuture<T> executeRestoreManagementTask(
Class<? extends ComputeTask<String, T>> taskCls,
String snpName
) {
cctx.kernalContext().security().authorize(ADMIN_SNAPSHOT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package org.apache.ignite.internal.processors.cache.persistence.snapshot;

import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.mxbean.SnapshotMXBean;

Expand Down Expand Up @@ -47,4 +51,20 @@ public SnapshotMXBeanImpl(GridKernalContext ctx) {
@Override public void cancelSnapshot(String snpName) {
mgr.cancelSnapshot(snpName).get();
}

/** {@inheritDoc} */
@Override public void restoreSnapshot(String name, String grpNames) {
Set<String> grpNamesSet = F.isEmpty(grpNames) ? null :
Arrays.stream(grpNames.split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet());

IgniteFuture<Void> fut = mgr.restoreSnapshot(name, grpNamesSet);

if (fut.isDone())
fut.get();
}

/** {@inheritDoc} */
@Override public void cancelSnapshotRestore(String name) {
mgr.cancelSnapshotRestore(name).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.ignite.internal.processors.cache.persistence.snapshot;

import java.util.List;
import org.apache.ignite.IgniteException;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.resources.IgniteInstanceResource;
Expand All @@ -28,7 +30,7 @@
* Snapshot restore cancel task.
*/
@GridInternal
class SnapshotRestoreCancelTask extends SnapshotRestoreManagementTask {
class SnapshotRestoreCancelTask extends SnapshotRestoreManagementTask<Boolean> {
/** Serial version uid. */
private static final long serialVersionUID = 0L;

Expand All @@ -44,4 +46,18 @@ class SnapshotRestoreCancelTask extends SnapshotRestoreManagementTask {
}
};
}

/** {@inheritDoc} */
@Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteException {
boolean ret = false;

for (ComputeJobResult r : results) {
if (r.getException() != null)
throw new IgniteException("Failed to execute job [nodeId=" + r.getNode().id() + ']', r.getException());

ret |= Boolean.TRUE.equals(r.getData());
}

return ret;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
/**
* Snapshot restore management task.
*/
abstract class SnapshotRestoreManagementTask extends ComputeTaskAdapter<String, Boolean> {
abstract class SnapshotRestoreManagementTask<T> extends ComputeTaskAdapter<String, T> {
/**
* @param param Compute job argument.
* @return Compute job.
Expand All @@ -51,20 +51,6 @@ abstract class SnapshotRestoreManagementTask extends ComputeTaskAdapter<String,
return map;
}

/** {@inheritDoc} */
@Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteException {
boolean ret = false;

for (ComputeJobResult r : results) {
if (r.getException() != null)
throw new IgniteException("Failed to execute job [nodeId=" + r.getNode().id() + ']', r.getException());

ret |= Boolean.TRUE.equals(r.getData());
}

return ret;
}

/** {@inheritDoc} */
@Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException {
// Handle all exceptions during the `reduce` operation.
Expand Down
Loading