diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index f9ea6a5734d9a1..550f61ded5f2d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -100,6 +100,8 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage; import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage; import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotLifecycleListener; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotListeners; import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; @@ -268,6 +270,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter /** Distributed process to restore cache group from the snapshot. */ private final SnapshotRestoreProcess restoreCacheGrpProc; + // todo something like pipeline + private final SnapshotListeners snpLsnrs = new SnapshotListeners(); + /** Resolved persistent data storage settings. */ private volatile PdsFolderSettings pdsSettings; @@ -381,6 +386,15 @@ public static String partDeltaFileName(int partId) { U.ensureDirectory(locSnpDir, "snapshot work directory", log); U.ensureDirectory(tmpWorkDir, "temp directory for snapshot creation", log); + snpLsnrs.register(new SnapshotConsistencyCheckOnRestore(ctx)); + + SnapshotLifecycleListener[] lsnrs = ctx.plugins().extensions(SnapshotLifecycleListener.class); + + if (lsnrs != null) { + for (SnapshotLifecycleListener lsnr : lsnrs) + snpLsnrs.register(lsnr); + } + MetricRegistry mreg = cctx.kernalContext().metric().registry(SNAPSHOT_METRICS); mreg.register("LastSnapshotStartTime", () -> lastSeenSnpFut.startTime, @@ -538,6 +552,10 @@ public void deleteSnapshot(File snpDir, String folderName) { } } + public SnapshotListeners listeners() { + return snpLsnrs; + } + /** * @param snpName Snapshot name. * @return Local snapshot directory for snapshot with given name. @@ -597,6 +615,13 @@ private IgniteInternalFuture initLocalSnapshotStartSt "prior to snapshot operation start: " + leftNodes)); } +// try { +// for (SnapshotLifecycleListener lsnr : lifecycleListeners) +// lsnr.beforeCreateSnapshot(cctx.localNode(), req); +// } catch (IgniteCheckedException e) { +// return new GridFinishedFuture<>(e); +// } + List grpIds = new ArrayList<>(F.viewReadOnly(req.groups(), CU::cacheId)); Set leftGrps = new HashSet<>(grpIds); @@ -669,6 +694,15 @@ private IgniteInternalFuture initLocalSnapshotStartSt log.info("Snapshot metafile has been created: " + smf.getAbsolutePath()); } + File snpDir = snapshotLocalDir(req.snapshotName()); + + Path binaryWorkDir = binaryWorkDir(snpDir.getAbsolutePath(), pdsSettings.folderName()).toPath(); + Path marshallerDir = mappingFileStoreWorkDir(snpDir.getAbsolutePath()).toPath(); + Path cacheDir = Paths.get(snpDir.toString(), DB_DEFAULT_FOLDER, pdsSettings.folderName()); + + for (SnapshotLifecycleListener lsnr : listeners().list()) + lsnr.afterCreate(req.snapshotName(), smf, binaryWorkDir, marshallerDir, cacheDir); + return new SnapshotOperationResponse(); } catch (IOException | IgniteCheckedException e) { @@ -960,6 +994,37 @@ public IgniteInternalFuture checkSnapshot(String name) { }); } + /** + * @param name Snapshot name. + * @return Future with snapshot metadata obtained from nodes. + */ + IgniteInternalFuture>> collectSnapshotMetadata(String name) { + GridKernalContext kctx0 = cctx.kernalContext(); + + kctx0.security().authorize(ADMIN_SNAPSHOT); + + Collection bltNodes = F.view(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE), + (node) -> CU.baselineNode(node, kctx0.state().clusterState())); + + kctx0.task().setThreadContext(TC_SKIP_AUTH, true); + kctx0.task().setThreadContext(TC_SUBGRID, bltNodes); + + return kctx0.task().execute(SnapshotMetadataCollectorTask.class, name); + } + +// /** +// * @param metas Nodes snapshot metadata. +// * @return Future with the verification results. +// */ +// IgniteInternalFuture runSnapshotVerification(Map> metas) { +// GridKernalContext kctx0 = cctx.kernalContext(); +// +// kctx0.task().setThreadContext(TC_SKIP_AUTH, true); +// kctx0.task().setThreadContext(TC_SUBGRID, new ArrayList<>(metas.keySet())); +// +// return kctx0.task().execute(SnapshotPartitionsVerifyTask.class, metas); +// } + /** * The check snapshot procedure performs compute operation over the whole cluster to verify the snapshot * entirety and partitions consistency. The result future will be completed with an exception if this @@ -1069,7 +1134,7 @@ public SnapshotMetadata readSnapshotMetadata(String snpName, String consId) { * @param smf File denoting to snapshot metafile. * @return Snapshot metadata instance. */ - private SnapshotMetadata readSnapshotMetadata(File smf) { + public SnapshotMetadata readSnapshotMetadata(File smf) { if (!smf.exists()) throw new IgniteException("Snapshot metafile cannot be read due to it doesn't exist: " + smf); @@ -1358,7 +1423,7 @@ public void onCacheGroupsStopped(List grps) { * @param consId Consistent node id. * @return Snapshot metadata file name. */ - private static String snapshotMetaFileName(String consId) { + public static String snapshotMetaFileName(String consId) { return U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotConsistencyCheckOnRestore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotConsistencyCheckOnRestore.java new file mode 100644 index 00000000000000..22fd4698345ca4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotConsistencyCheckOnRestore.java @@ -0,0 +1,242 @@ +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridComponent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore; +import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; +import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotLifecycleListener; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO; +import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2; +import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2; +import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA; +import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX; +import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitionFiles; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId; +import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId; +import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.calculatePartitionHash; +import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum; + +public class SnapshotConsistencyCheckOnRestore implements SnapshotLifecycleListener> { + /** */ + private final GridKernalContext ctx; + + /** */ + private final IgniteLogger log; + + @Override public int priority() { + return Integer.MIN_VALUE; + } + + public SnapshotConsistencyCheckOnRestore(GridKernalContext ctx) { + this.ctx = ctx; + + log = ctx.log(getClass()); + } + + /** {@inheritDoc} */ + @Override public HashMap beforeRestore( + String name, + @Nullable Collection rqGrps, + File metadata, + Path binaryDir, + Path marshallerDir, + Path cacheDir + ) throws IgniteCheckedException { + IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr(); + + SnapshotMetadata meta = snpMgr.readSnapshotMetadata(metadata); + + if (log.isInfoEnabled()) { + log.info("Verify snapshot partitions procedure has been initiated " + + "[snpName=" + name + ", consId=" + meta.consistentId() + ']'); + } + Set grps = F.isEmpty(rqGrps) ? new HashSet<>(meta.partitions().keySet()) : + rqGrps.stream().map(CU::cacheId).collect(Collectors.toSet()); + Set partFiles = new HashSet<>(); + + for (File dir : snpMgr.snapshotCacheDirectories(name, meta.folderName())) { + int grpId = CU.cacheId(cacheGroupName(dir)); + + if (!grps.remove(grpId)) + continue; + + Set parts = new HashSet<>(meta.partitions().get(grpId)); + + for (File part : cachePartitionFiles(dir)) { + int partId = partId(part.getName()); + + if (!parts.remove(partId)) + continue; + + partFiles.add(part); + } + + if (!parts.isEmpty()) { + throw new IgniteException("Snapshot data doesn't contain required cache group partition " + + "[grpId=" + grpId + ", snpName=" + name + ", consId=" + meta.consistentId() + + ", missed=" + parts + ", meta=" + meta + ']'); + } + } + + if (!grps.isEmpty()) { + throw new IgniteException("Snapshot data doesn't contain required cache groups " + + "[grps=" + grps + ", snpName=" + name + ", consId=" + meta.consistentId() + + ", meta=" + meta + ']'); + } + + Map res = new ConcurrentHashMap<>(); + ThreadLocal buff = ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(meta.pageSize()) + .order(ByteOrder.nativeOrder())); + +// try { + GridKernalContext snpCtx = snpMgr.createStandaloneKernalContext(name, meta.folderName()); + + for (GridComponent comp : snpCtx) + comp.start(); + + try { + U.doInParallel( + snpMgr.snapshotExecutorService(), + partFiles, + part -> { + String grpName = cacheGroupName(part.getParentFile()); + int grpId = CU.cacheId(grpName); + int partId = partId(part.getName()); + + FilePageStoreManager storeMgr = (FilePageStoreManager)ctx.cache().context().pageStore(); + + try (FilePageStore pageStore = (FilePageStore)storeMgr.getPageStoreFactory(grpId, false) + .createPageStore(getTypeByPartId(partId), + part::toPath, + val -> { + }) + ) { + if (partId == INDEX_PARTITION) { + checkPartitionsPageCrcSum(() -> pageStore, INDEX_PARTITION, FLAG_IDX); + + return null; + } + + if (grpId == MetaStorage.METASTORAGE_CACHE_ID) { + checkPartitionsPageCrcSum(() -> pageStore, partId, FLAG_DATA); + + return null; + } + + ByteBuffer pageBuff = buff.get(); + pageBuff.clear(); + pageStore.read(0, pageBuff, true); + + long pageAddr = GridUnsafe.bufferAddress(pageBuff); + + PagePartitionMetaIO io = PageIO.getPageIO(pageBuff); + GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageAddr)); + + if (partState != OWNING) { + throw new IgniteCheckedException("Snapshot partitions must be in the OWNING " + + "state only: " + partState); + } + + long updateCntr = io.getUpdateCounter(pageAddr); + long size = io.getSize(pageAddr); + + if (log.isDebugEnabled()) { + log.debug("Partition [grpId=" + grpId + + ", id=" + partId + + ", counter=" + updateCntr + + ", size=" + size + "]"); + } + + // Snapshot partitions must always be in OWNING state. + // There is no `primary` partitions for snapshot. + PartitionKeyV2 key = new PartitionKeyV2(grpId, partId, grpName); + + PartitionHashRecordV2 hash = calculatePartitionHash(key, + updateCntr, + meta.consistentId(), + GridDhtPartitionState.OWNING, + false, + size, + snpMgr.partitionRowIterator(snpCtx, grpName, partId, pageStore)); + + assert hash != null : "OWNING must have hash: " + key; + + res.put(key, hash); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + + return null; + } + ); + } + finally { + for (GridComponent comp : snpCtx) + comp.stop(true); + } +// } +// catch (IgniteCheckedException e) { +// throw new IgniteException(e); +// } + + return new HashMap<>(res); + } + + /** {@inheritDoc} */ + @Override public void handlePreRestoreResults(String name, @Nullable Collection grps, + Map> res, + Map errs) throws IgniteCheckedException { + Map> clusterHashes = new HashMap<>(); + + for (Map nodeHashes : res.values()) { + for (Map.Entry e : nodeHashes.entrySet()) { + List records = clusterHashes.computeIfAbsent(e.getKey(), k -> new ArrayList<>()); + + records.add(e.getValue()); + } + } + + IdleVerifyResultV2 verifyResult = new IdleVerifyResultV2(clusterHashes, errs); + + if (!F.isEmpty(errs) || verifyResult.hasConflicts()) { + StringBuilder sb = new StringBuilder(); + + verifyResult.print(sb::append, true); + + throw new IgniteCheckedException(sb.toString()); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java index 0e99e8cfc094c7..61e19980872291 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java @@ -17,62 +17,25 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeJob; -import org.apache.ignite.compute.ComputeJobAdapter; import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.compute.ComputeJobResultPolicy; import org.apache.ignite.compute.ComputeTaskAdapter; -import org.apache.ignite.internal.GridComponent; -import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; -import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore; -import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; -import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; -import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; -import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO; -import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2; -import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2; import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2; import org.apache.ignite.internal.processors.task.GridInternal; -import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.resources.LoggerResource; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA; -import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX; -import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; -import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; -import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal; -import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName; -import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitionFiles; -import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId; -import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId; -import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.calculatePartitionHash; -import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum; import static org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2.reduce0; /** @@ -159,198 +122,4 @@ public class SnapshotPartitionsVerifyTask return ComputeJobResultPolicy.WAIT; } - /** Job that collects update counters of snapshot partitions on the node it executes. */ - private static class VisorVerifySnapshotPartitionsJob extends ComputeJobAdapter { - /** Serial version uid. */ - private static final long serialVersionUID = 0L; - - /** Ignite instance. */ - @IgniteInstanceResource - private IgniteEx ignite; - - /** Injected logger. */ - @LoggerResource - private IgniteLogger log; - - /** Snapshot name to validate. */ - private final String snpName; - - /** Consistent snapshot metadata file name. */ - private final String consId; - - /** Set of cache groups to be checked in the snapshot or {@code empty} to check everything. */ - private final Set rqGrps; - - /** - * @param snpName Snapshot name to validate. - * @param consId Consistent snapshot metadata file name. - * @param rqGrps Set of cache groups to be checked in the snapshot or {@code empty} to check everything. - */ - public VisorVerifySnapshotPartitionsJob(String snpName, String consId, Collection rqGrps) { - this.snpName = snpName; - this.consId = consId; - - this.rqGrps = rqGrps == null ? Collections.emptySet() : new HashSet<>(rqGrps); - } - - @Override public Map execute() throws IgniteException { - IgniteSnapshotManager snpMgr = ignite.context().cache().context().snapshotMgr(); - - if (log.isInfoEnabled()) { - log.info("Verify snapshot partitions procedure has been initiated " + - "[snpName=" + snpName + ", consId=" + consId + ']'); - } - - SnapshotMetadata meta = snpMgr.readSnapshotMetadata(snpName, consId); - Set grps = rqGrps.isEmpty() ? new HashSet<>(meta.partitions().keySet()) : - rqGrps.stream().map(CU::cacheId).collect(Collectors.toSet()); - Set partFiles = new HashSet<>(); - - for (File dir : snpMgr.snapshotCacheDirectories(snpName, meta.folderName())) { - int grpId = CU.cacheId(cacheGroupName(dir)); - - if (!grps.remove(grpId)) - continue; - - Set parts = new HashSet<>(meta.partitions().get(grpId)); - - for (File part : cachePartitionFiles(dir)) { - int partId = partId(part.getName()); - - if (!parts.remove(partId)) - continue; - - partFiles.add(part); - } - - if (!parts.isEmpty()) { - throw new IgniteException("Snapshot data doesn't contain required cache group partition " + - "[grpId=" + grpId + ", snpName=" + snpName + ", consId=" + consId + - ", missed=" + parts + ", meta=" + meta + ']'); - } - } - - if (!grps.isEmpty()) { - throw new IgniteException("Snapshot data doesn't contain required cache groups " + - "[grps=" + grps + ", snpName=" + snpName + ", consId=" + consId + - ", meta=" + meta + ']'); - } - - Map res = new ConcurrentHashMap<>(); - ThreadLocal buff = ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(meta.pageSize()) - .order(ByteOrder.nativeOrder())); - - try { - GridKernalContext snpCtx = snpMgr.createStandaloneKernalContext(snpName, meta.folderName()); - - for (GridComponent comp : snpCtx) - comp.start(); - - try { - U.doInParallel( - snpMgr.snapshotExecutorService(), - partFiles, - part -> { - String grpName = cacheGroupName(part.getParentFile()); - int grpId = CU.cacheId(grpName); - int partId = partId(part.getName()); - - FilePageStoreManager storeMgr = (FilePageStoreManager)ignite.context().cache().context().pageStore(); - - try (FilePageStore pageStore = (FilePageStore)storeMgr.getPageStoreFactory(grpId, false) - .createPageStore(getTypeByPartId(partId), - part::toPath, - val -> { - }) - ) { - if (partId == INDEX_PARTITION) { - checkPartitionsPageCrcSum(() -> pageStore, INDEX_PARTITION, FLAG_IDX); - - return null; - } - - if (grpId == MetaStorage.METASTORAGE_CACHE_ID) { - checkPartitionsPageCrcSum(() -> pageStore, partId, FLAG_DATA); - - return null; - } - - ByteBuffer pageBuff = buff.get(); - pageBuff.clear(); - pageStore.read(0, pageBuff, true); - - long pageAddr = GridUnsafe.bufferAddress(pageBuff); - - PagePartitionMetaIO io = PageIO.getPageIO(pageBuff); - GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageAddr)); - - if (partState != OWNING) { - throw new IgniteCheckedException("Snapshot partitions must be in the OWNING " + - "state only: " + partState); - } - - long updateCntr = io.getUpdateCounter(pageAddr); - long size = io.getSize(pageAddr); - - if (log.isDebugEnabled()) { - log.debug("Partition [grpId=" + grpId - + ", id=" + partId - + ", counter=" + updateCntr - + ", size=" + size + "]"); - } - - // Snapshot partitions must always be in OWNING state. - // There is no `primary` partitions for snapshot. - PartitionKeyV2 key = new PartitionKeyV2(grpId, partId, grpName); - - PartitionHashRecordV2 hash = calculatePartitionHash(key, - updateCntr, - consId, - GridDhtPartitionState.OWNING, - false, - size, - snpMgr.partitionRowIterator(snpCtx, grpName, partId, pageStore)); - - assert hash != null : "OWNING must have hash: " + key; - - res.put(key, hash); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - - return null; - } - ); - } - finally { - for (GridComponent comp : snpCtx) - comp.stop(true); - } - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - - return res; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - VisorVerifySnapshotPartitionsJob job = (VisorVerifySnapshotPartitionsJob)o; - - return snpName.equals(job.snpName) && consId.equals(job.consId); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return Objects.hash(snpName, consId); - } - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java index 5061532d9e65d7..fd40063ac28e44 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java @@ -53,7 +53,8 @@ import org.apache.ignite.internal.processors.cache.StoredCacheData; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.ClusterSnapshotFuture; -import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.RestoreHandleTask; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotLifecycleListener; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.util.distributed.DistributedProcess; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -72,9 +73,12 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX; import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath; +import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SKIP_AUTH; +import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID; import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE; import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK; import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START; +import static org.apache.ignite.plugin.security.SecurityPermission.ADMIN_SNAPSHOT; /** * Distributed process to restore cache group from the snapshot. @@ -151,7 +155,7 @@ protected void cleanup() throws IgniteCheckedException { * @return Future that will be completed when the restore operation is complete and the cache groups are started. */ public IgniteFuture start(String snpName, @Nullable Collection cacheGrpNames) { - ClusterSnapshotFuture fut0; + ClusterSnapshotFuture fut0 = null; try { if (ctx.clientNode()) @@ -179,33 +183,28 @@ public IgniteFuture start(String snpName, @Nullable Collection cac fut0 = fut; } - } - catch (IgniteException e) { - return new IgniteFinishedFutureImpl<>(e); - } - ctx.cache().context().snapshotMgr().checkSnapshot(snpName, cacheGrpNames).listen(f -> { - if (f.error() != null) { - finishProcess(fut0.rqId, f.error()); + // todo - async execution + Map> metas = + ctx.cache().context().snapshotMgr().collectSnapshotMetadata(snpName).get(); - return; - } + Iterable> lsnrs = ctx.cache().context().snapshotMgr().listeners().list(); - if (!F.isEmpty(f.result().exceptions())) { - finishProcess(fut0.rqId, F.first(f.result().exceptions().values())); + if (!F.isEmpty(lsnrs)) { + ctx.security().authorize(ADMIN_SNAPSHOT); - return; - } + Collection bltNodes = F.view(ctx.discovery().serverNodes(AffinityTopologyVersion.NONE), + (node) -> CU.baselineNode(node, ctx.state().clusterState())); - if (fut0.interruptEx != null) { - finishProcess(fut0.rqId, fut0.interruptEx); + ctx.task().setThreadContext(TC_SKIP_AUTH, true); + ctx.task().setThreadContext(TC_SUBGRID, bltNodes); - return; + ctx.task().execute(RestoreHandleTask.class, new SnapshotPartitionsVerifyTaskArg(cacheGrpNames, metas)).get(); } +// Map> metas = f.result().metas(); Set dataNodes = new HashSet<>(); Set snpBltNodes = null; - Map> metas = f.result().metas(); Map reqGrpIds = cacheGrpNames == null ? Collections.emptyMap() : cacheGrpNames.stream().collect(Collectors.toMap(CU::cacheId, v -> v)); @@ -229,14 +228,14 @@ public IgniteFuture start(String snpName, @Nullable Collection cac finishProcess(fut0.rqId, new IllegalArgumentException(OP_REJECT_MSG + "No snapshot data " + "has been found [groups=" + reqGrpIds.values() + ", snapshot=" + snpName + ']')); - return; + return new IgniteFutureImpl<>(fut0); } if (!reqGrpIds.isEmpty()) { finishProcess(fut0.rqId, new IllegalArgumentException(OP_REJECT_MSG + "Cache group(s) was not " + "found in the snapshot [groups=" + reqGrpIds.values() + ", snapshot=" + snpName + ']')); - return; + return new IgniteFutureImpl<>(fut0); } Collection bltNodes = F.viewReadOnly(ctx.discovery().serverNodes(AffinityTopologyVersion.NONE), @@ -248,26 +247,105 @@ public IgniteFuture start(String snpName, @Nullable Collection cac finishProcess(fut0.rqId, new IgniteIllegalStateException(OP_REJECT_MSG + "Some nodes required to " + "restore a cache group are missing [nodeId(s)=" + snpBltNodes + ", snapshot=" + snpName + ']')); - return; - } - - IdleVerifyResultV2 res = f.result().idleVerifyResult(); - - if (!F.isEmpty(res.exceptions()) || res.hasConflicts()) { - StringBuilder sb = new StringBuilder(); - - res.print(sb::append, true); - - finishProcess(fut0.rqId, new IgniteException(sb.toString())); - - return; + return new IgniteFutureImpl<>(fut0); } SnapshotOperationRequest req = new SnapshotOperationRequest( fut0.rqId, F.first(dataNodes), snpName, cacheGrpNames, dataNodes); prepareRestoreProc.start(req.requestId(), req); - }); + } + catch (IgniteException | IgniteCheckedException e) { + if (fut0 != null) + finishProcess(fut0.rqId, e); + + return new IgniteFinishedFutureImpl<>(e); + } + +// ctx.cache().context().snapshotMgr().checkSnapshot(snpName, cacheGrpNames).listen(f -> { +// if (f.error() != null) { +// finishProcess(fut0.rqId, f.error()); +// +// return; +// } +// +// if (!F.isEmpty(f.result().exceptions())) { +// finishProcess(fut0.rqId, F.first(f.result().exceptions().values())); +// +// return; +// } +// +// if (fut0.interruptEx != null) { +// finishProcess(fut0.rqId, fut0.interruptEx); +// +// return; +// } +// +// IdleVerifyResultV2 res = f.result().idleVerifyResult(); +// +// if (!F.isEmpty(res.exceptions()) || res.hasConflicts()) { +// StringBuilder sb = new StringBuilder(); +// +// res.print(sb::append, true); +// +// finishProcess(fut0.rqId, new IgniteException(sb.toString())); +// +// return; +// } +// +// Map> metas = f.result().metas(); +// Set dataNodes = new HashSet<>(); +// Set snpBltNodes = null; +// Map reqGrpIds = cacheGrpNames == null ? Collections.emptyMap() : +// cacheGrpNames.stream().collect(Collectors.toMap(CU::cacheId, v -> v)); +// +// for (Map.Entry> entry : metas.entrySet()) { +// SnapshotMetadata meta = F.first(entry.getValue()); +// +// assert meta != null : entry.getKey().id(); +// +// if (!entry.getKey().consistentId().toString().equals(meta.consistentId())) +// continue; +// +// if (snpBltNodes == null) +// snpBltNodes = new HashSet<>(meta.baselineNodes()); +// +// dataNodes.add(entry.getKey().id()); +// +// reqGrpIds.keySet().removeAll(meta.partitions().keySet()); +// } +// +// if (snpBltNodes == null) { +// finishProcess(fut0.rqId, new IllegalArgumentException(OP_REJECT_MSG + "No snapshot data " + +// "has been found [groups=" + reqGrpIds.values() + ", snapshot=" + snpName + ']')); +// +// return; +// } +// +// if (!reqGrpIds.isEmpty()) { +// finishProcess(fut0.rqId, new IllegalArgumentException(OP_REJECT_MSG + "Cache group(s) was not " + +// "found in the snapshot [groups=" + reqGrpIds.values() + ", snapshot=" + snpName + ']')); +// +// return; +// } +// +// Collection bltNodes = F.viewReadOnly(ctx.discovery().serverNodes(AffinityTopologyVersion.NONE), +// node -> node.consistentId().toString(), (node) -> CU.baselineNode(node, ctx.state().clusterState())); +// +// snpBltNodes.removeAll(bltNodes); +// +// if (!snpBltNodes.isEmpty()) { +// finishProcess(fut0.rqId, new IgniteIllegalStateException(OP_REJECT_MSG + "Some nodes required to " + +// "restore a cache group are missing [nodeId(s)=" + snpBltNodes + ", snapshot=" + snpName + ']')); +// +// return; +// } +// +// SnapshotOperationRequest req = new SnapshotOperationRequest( +// fut0.rqId, F.first(dataNodes), snpName, cacheGrpNames, dataNodes); +// +// prepareRestoreProc.start(req.requestId(), req); +// }); return new IgniteFutureImpl<>(fut0); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotVerifier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotVerifier.java new file mode 100644 index 00000000000000..e96106fdb112fb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotVerifier.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.plugin.Extension; + +/** + * Optional user-defined integrity check of the snapshot. + */ +public interface SnapshotVerifier extends Extension { + /** + * Performs an optional user-defined integrity check of the snapshot before restoring it. + * + * @param metas The map of distribution of snapshot metadata pieces across the cluster. + * @param grps Cache groups to be restored or {@code null} if all cache groups are being restored.. + * @throws IgniteCheckedException If the check fails. + */ + public void verify(Map> metas, Collection grps) throws IgniteCheckedException; +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/VisorVerifySnapshotPartitionsJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/VisorVerifySnapshotPartitionsJob.java new file mode 100644 index 00000000000000..9f08da9ea1cd68 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/VisorVerifySnapshotPartitionsJob.java @@ -0,0 +1,241 @@ +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.compute.ComputeJobAdapter; +import org.apache.ignite.internal.GridComponent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore; +import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; +import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO; +import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2; +import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; + +import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA; +import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX; +import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitionFiles; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId; +import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId; +import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.calculatePartitionHash; +import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum; + +/** Job that collects update counters of snapshot partitions on the node it executes. */ +class VisorVerifySnapshotPartitionsJob extends ComputeJobAdapter { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Ignite instance. */ + @IgniteInstanceResource + private IgniteEx ignite; + + /** Injected logger. */ + @LoggerResource + private IgniteLogger log; + + /** Snapshot name to validate. */ + private final String snpName; + + /** Consistent snapshot metadata file name. */ + private final String consId; + + /** Set of cache groups to be checked in the snapshot or {@code empty} to check everything. */ + private final Set rqGrps; + + /** + * @param snpName Snapshot name to validate. + * @param consId Consistent snapshot metadata file name. + * @param rqGrps Set of cache groups to be checked in the snapshot or {@code empty} to check everything. + */ + public VisorVerifySnapshotPartitionsJob(String snpName, String consId, Collection rqGrps) { + this.snpName = snpName; + this.consId = consId; + + this.rqGrps = rqGrps == null ? Collections.emptySet() : new HashSet<>(rqGrps); + } + + @Override public Map execute() throws IgniteException { + IgniteSnapshotManager snpMgr = ignite.context().cache().context().snapshotMgr(); + + if (log.isInfoEnabled()) { + log.info("Verify snapshot partitions procedure has been initiated " + + "[snpName=" + snpName + ", consId=" + consId + ']'); + } + + SnapshotMetadata meta = snpMgr.readSnapshotMetadata(snpName, consId); + Set grps = rqGrps.isEmpty() ? new HashSet<>(meta.partitions().keySet()) : + rqGrps.stream().map(CU::cacheId).collect(Collectors.toSet()); + Set partFiles = new HashSet<>(); + + for (File dir : snpMgr.snapshotCacheDirectories(snpName, meta.folderName())) { + int grpId = CU.cacheId(cacheGroupName(dir)); + + if (!grps.remove(grpId)) + continue; + + Set parts = new HashSet<>(meta.partitions().get(grpId)); + + for (File part : cachePartitionFiles(dir)) { + int partId = partId(part.getName()); + + if (!parts.remove(partId)) + continue; + + partFiles.add(part); + } + + if (!parts.isEmpty()) { + throw new IgniteException("Snapshot data doesn't contain required cache group partition " + + "[grpId=" + grpId + ", snpName=" + snpName + ", consId=" + consId + + ", missed=" + parts + ", meta=" + meta + ']'); + } + } + + if (!grps.isEmpty()) { + throw new IgniteException("Snapshot data doesn't contain required cache groups " + + "[grps=" + grps + ", snpName=" + snpName + ", consId=" + consId + + ", meta=" + meta + ']'); + } + + Map res = new ConcurrentHashMap<>(); + ThreadLocal buff = ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(meta.pageSize()) + .order(ByteOrder.nativeOrder())); + + try { + GridKernalContext snpCtx = snpMgr.createStandaloneKernalContext(snpName, meta.folderName()); + + for (GridComponent comp : snpCtx) + comp.start(); + + try { + U.doInParallel( + snpMgr.snapshotExecutorService(), + partFiles, + part -> { + String grpName = cacheGroupName(part.getParentFile()); + int grpId = CU.cacheId(grpName); + int partId = partId(part.getName()); + + FilePageStoreManager storeMgr = (FilePageStoreManager)ignite.context().cache().context().pageStore(); + + try (FilePageStore pageStore = (FilePageStore)storeMgr.getPageStoreFactory(grpId, false) + .createPageStore(getTypeByPartId(partId), + part::toPath, + val -> { + }) + ) { + if (partId == INDEX_PARTITION) { + checkPartitionsPageCrcSum(() -> pageStore, INDEX_PARTITION, FLAG_IDX); + + return null; + } + + if (grpId == MetaStorage.METASTORAGE_CACHE_ID) { + checkPartitionsPageCrcSum(() -> pageStore, partId, FLAG_DATA); + + return null; + } + + ByteBuffer pageBuff = buff.get(); + pageBuff.clear(); + pageStore.read(0, pageBuff, true); + + long pageAddr = GridUnsafe.bufferAddress(pageBuff); + + PagePartitionMetaIO io = PageIO.getPageIO(pageBuff); + GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageAddr)); + + if (partState != OWNING) { + throw new IgniteCheckedException("Snapshot partitions must be in the OWNING " + + "state only: " + partState); + } + + long updateCntr = io.getUpdateCounter(pageAddr); + long size = io.getSize(pageAddr); + + if (log.isDebugEnabled()) { + log.debug("Partition [grpId=" + grpId + + ", id=" + partId + + ", counter=" + updateCntr + + ", size=" + size + "]"); + } + + // Snapshot partitions must always be in OWNING state. + // There is no `primary` partitions for snapshot. + PartitionKeyV2 key = new PartitionKeyV2(grpId, partId, grpName); + + PartitionHashRecordV2 hash = calculatePartitionHash(key, + updateCntr, + consId, + GridDhtPartitionState.OWNING, + false, + size, + snpMgr.partitionRowIterator(snpCtx, grpName, partId, pageStore)); + + assert hash != null : "OWNING must have hash: " + key; + + res.put(key, hash); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + + return null; + } + ); + } + finally { + for (GridComponent comp : snpCtx) + comp.stop(true); + } + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + VisorVerifySnapshotPartitionsJob job = (VisorVerifySnapshotPartitionsJob)o; + + return snpName.equals(job.snpName) && consId.equals(job.consId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return Objects.hash(snpName, consId); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/RestoreHandleTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/RestoreHandleTask.java new file mode 100644 index 00000000000000..8166cdb463cb66 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/RestoreHandleTask.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle; + +import java.io.File; +import java.io.Serializable; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.ComputeJobAdapter; +import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.compute.ComputeTaskAdapter; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotVerifyException; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadata; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyTaskArg; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.MarshallerContextImpl.mappingFileStoreWorkDir; +import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.binaryWorkDir; +import static org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver.DB_DEFAULT_FOLDER; + +public class RestoreHandleTask extends ComputeTaskAdapter> { + /** Ignite instance. */ + @IgniteInstanceResource + private IgniteEx ignite; + + /** Task argument. */ + private final Map> metas = new HashMap<>(); + + /** Cache groups to be restored ({@code null} if all cache groups are restored from the snapshot). */ + private Collection grps; + + /** {@inheritDoc} */ + @Override public @NotNull Map map(List subgrid, + SnapshotPartitionsVerifyTaskArg arg) throws IgniteException { + + grps = arg.cacheGroupNames(); + + Map> clusterMetas = arg.clusterMetadata(); + + if (!subgrid.containsAll(clusterMetas.keySet())) { + throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(), + new IgniteException("Some of Ignite nodes left the cluster during the snapshot verification " + + "[curr=" + F.viewReadOnly(subgrid, F.node2id()) + + ", init=" + F.viewReadOnly(clusterMetas.keySet(), F.node2id()) + ']'))); + } + + Map jobs = new HashMap<>(); + Set allMetas = new HashSet<>(); + clusterMetas.values().forEach(allMetas::addAll); + + Set missed = null; + + for (SnapshotMetadata meta : allMetas) { + if (missed == null) + missed = new HashSet<>(meta.baselineNodes()); + + missed.remove(meta.consistentId()); + + if (missed.isEmpty()) + break; + } + + if (!missed.isEmpty()) { + throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(), + new IgniteException("Some metadata is missing from the snapshot: " + missed))); + } + + metas.putAll(clusterMetas); + + while (!allMetas.isEmpty()) { + for (Map.Entry> e : clusterMetas.entrySet()) { + SnapshotMetadata meta = F.find(e.getValue(), null, allMetas::remove); + + if (meta == null) + continue; + + jobs.put(new RestoreHandleJob(meta.snapshotName(), meta.consistentId(), arg.cacheGroupNames()), + e.getKey()); + + if (allMetas.isEmpty()) + break; + } + } + + return jobs; + } + + /** {@inheritDoc} */ + @Nullable @Override public Map reduce(List results) throws IgniteException { + // re-map + Map, Map>> resMap = new HashMap<>(); + + for (ComputeJobResult res : results) { + // Unahndled exception. + if (res.getException() != null) + throw res.getException(); + + Map nodeDataMap = res.getData(); + + for (Map.Entry entry : nodeDataMap.entrySet()) { + String lsnrName = entry.getKey(); + + T2, Map> mapPair = + resMap.computeIfAbsent(lsnrName, v -> new T2<>(new HashMap<>(), new HashMap<>())); + + Serializable val = entry.getValue(); + + if (val instanceof Exception) + mapPair.get2().put(res.getNode(), (Exception)val); + else + mapPair.get1().put(res.getNode(), val); + } + } + + // todo validate count of listeners on nodes + String snpName = F.first(F.first(metas.values())).snapshotName(); + + for (SnapshotLifecycleListener lsnr : ignite.context().cache().context().snapshotMgr().listeners().list()) { + T2, Map> mapPair = resMap.get(lsnr.getClass().getSimpleName()); + + // The listener might have been enabled at runtime. + if (mapPair == null) + continue; + + try { + lsnr.handlePreRestoreResults(snpName, grps, mapPair.get1(), mapPair.get2()); + } catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + return null; + } + + private static class RestoreHandleJob extends ComputeJobAdapter { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Ignite instance. */ + @IgniteInstanceResource + private IgniteEx ignite; + + /** Injected logger. */ + @LoggerResource + private IgniteLogger log; + + String snpName; + + String consistentId; + + Collection grps; + + public RestoreHandleJob(String snpName, String consistentId, Collection grps) { + this.snpName = snpName; + this.consistentId = consistentId; + this.grps = grps; + } + + /** {@inheritDoc} */ + @Override public Map execute() throws IgniteException { + // todo expand restore hadnler separately + Map resMap = new HashMap<>(); + IgniteSnapshotManager snpMgr = ignite.context().cache().context().snapshotMgr(); + SnapshotListeners lsnrs = snpMgr.listeners(); + File snpDir = snpMgr.snapshotLocalDir(snpName); + File smf = new File(snpDir, IgniteSnapshotManager.snapshotMetaFileName(consistentId)); + + try { + String pdsFolderName = ignite.context().pdsFolderResolver().resolveFolders().folderName(); + + Path binaryWorkDir = binaryWorkDir(snpDir.getAbsolutePath(), pdsFolderName).toPath(); + Path marshallerDir = mappingFileStoreWorkDir(snpDir.getAbsolutePath()).toPath(); + Path cacheDir = Paths.get(snpDir.toString(), DB_DEFAULT_FOLDER, pdsFolderName); + + for (SnapshotLifecycleListener lsnr : lsnrs.list()) { + Serializable res; + + try { + res = lsnr.beforeRestore(snpName, grps, smf, binaryWorkDir, marshallerDir, cacheDir); + } + catch (Exception e) { + res = e; + } + + resMap.put(lsnr.getClass().getSimpleName(), res); + } + } catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + + return resMap; + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotLifecycleListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotLifecycleListener.java new file mode 100644 index 00000000000000..9ff2dbccda5784 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotLifecycleListener.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle; + +import java.io.File; +import java.io.Serializable; +import java.nio.file.Path; +import java.util.Collection; +import java.util.Map; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.plugin.Extension; +import org.jetbrains.annotations.Nullable; + +/** + * Snapshot lifecycle listener. + */ +public interface SnapshotLifecycleListener extends Extension { + /** Listener name. */ + public default String name() { + return getClass().getSimpleName(); + } + + /** Listener invocation priority (ascending order is used). */ + public default int priority() { + return 0; + } + + /** + * Called locally after the snapshot files have been created on the node. + * + * @param name Snapshot name. + * @param metadata Snapshot metadata file. + * @param binaryDir Snapshot binary metadata directory. + * @param marshallerDir Snapshot marshaller data directory. + * @param cacheDir Snapshot cache data directory. + * @throws IgniteCheckedException If failed. + */ + public default void afterCreate( + String name, + File metadata, + Path binaryDir, + Path marshallerDir, + Path cacheDir + ) throws IgniteCheckedException { + // No-op. + } + + /** + * Called locally before restore snapshot files. + * + * @param name Snapshot name. + * @param grps Cache groups to be restored ({@code null} if all cache groups are restored from the snapshot). + * @param metadata Snapshot metadata file. + * @param binaryDir Snapshot binary metadata directory. + * @param marshallerDir Snapshot marshaller data directory. + * @param cacheDir Snapshot cache data directory. + * + * @return Local node result, or {@code null} if cluster-wide aggregation is not required. + * @throws IgniteCheckedException If failed. + */ + @Nullable public default T beforeRestore( + String name, + @Nullable Collection grps, + File metadata, + Path binaryDir, + Path marshallerDir, + Path cacheDir + ) throws IgniteCheckedException { + return null; + } + + /** + * Process the results of a pre-restore operation across the cluster. + * + * @param name Snapshot name. + * @param grps Cache groups to be restored ({@code null} if all cache groups are restored from the snapshot). + * @param res Results from all nodes. + * @param errs Errors from all nodes. + * @throws IgniteCheckedException If failed. + */ + public default void handlePreRestoreResults( + String name, + @Nullable Collection grps, + Map res, + Map errs + ) throws IgniteCheckedException { + Map.Entry errEntry = F.first(errs.entrySet()); + + if (errEntry == null) + return; + + throw new IgniteCheckedException("Snapshot restore handler " + name() + + " has failed on node " + errEntry.getKey().id() + '.', errEntry.getValue()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotListeners.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotListeners.java new file mode 100644 index 00000000000000..787791e229f08a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotListeners.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.ignite.internal.util.typedef.F; + +public class SnapshotListeners { +// private final IgnitePluginProcessor plugins; + private final Map lsnrsState = new HashMap<>(); + private final List> lsnrsByPriority = new ArrayList<>(); + +// public SnapshotListeners(IgnitePluginProcessor plugins) { +//// this.plugins = plugins; +// } + + public Iterable> list() { + return F.viewReadOnly(lsnrsByPriority, lsnr -> lsnr, lsnr -> lsnrsState.get(lsnr.name())); + } + +// public @Nullable SnapshotLifecycleListener find(String name) { +// T2 lsnrState = lsnrsByName.get(name); +// +// if (lsnrState == null) +// return null; +// +// return lsnrState.get1() ? lsnrState.get2() : null; +// } + + public void register(SnapshotLifecycleListener lsnr) { + if (lsnrsState.putIfAbsent(lsnr.name(), true) != null) + throw new IllegalArgumentException("Listener " + lsnr.name() + " is already registered."); + + int idx = Collections.binarySearch(lsnrsByPriority, lsnr, Comparator.comparingInt(SnapshotLifecycleListener::priority)); + + if (idx < 0) + lsnrsByPriority.add(-idx - 1, lsnr); + else + lsnrsByPriority.add(idx, lsnr); + } + + public boolean disable(String name) { + Boolean enabled = lsnrsState.get(name); + + if (!Boolean.TRUE.equals(enabled)) + return false; + + return lsnrsState.put(name, false); + } + + public boolean enable(String name) { + Boolean enabled = lsnrsState.get(name); + + if (!Boolean.FALSE.equals(enabled)) + return false; + + return !lsnrsState.put(name, true); +// +// SnapshotLifecycleListener lsnr = lsnrsState.get(name); +// +// if (lsnr == null) +// throw new IllegalArgumentException("No listener with name " + lsnr.name() + " has been registered."); +// +// for (String ) +// +// for (SnapshotLifecycleListener lsnr : lsnrsState.values()) { +// if (name.equals(lsnr.name())) { +// register(lsnr); +// +// return true; +// } +// } +// +// return false; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotLifecycleListenerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotLifecycleListenerTest.java new file mode 100644 index 00000000000000..4b2f9683db4fdc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotLifecycleListenerTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.File; +import java.io.Serializable; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotLifecycleListener; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotListeners; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.plugin.AbstractTestPluginProvider; +import org.apache.ignite.plugin.ExtensionRegistry; +import org.apache.ignite.plugin.PluginContext; +import org.apache.ignite.testframework.GridTestUtils; +import org.jetbrains.annotations.Nullable; +import org.junit.Test; + +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFileName; +import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause; + +public class IgniteClusterSnapshotLifecycleListenerTest extends IgniteClusterSnapshotRestoreBaseTest { + /** */ + private List extensions = new ArrayList<>(); + + private SnapshotLifecyclePluginProvider testPluginProvider = new SnapshotLifecyclePluginProvider(extensions); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName).setPluginProviders(testPluginProvider); + } + + /** @throws Exception If fails. */ + @Test + public void testRestoreWithMissedPart() throws Exception { + IgniteEx ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE); + + Path part0 = U.searchFileRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(), + getPartitionFileName(0)); + + assertNotNull(part0); + assertTrue(part0.toString(), part0.toFile().exists()); + assertTrue(part0.toFile().delete()); + + for (Ignite g : G.allGrids()) { + SnapshotListeners lsnrs = ((IgniteEx)g).context().cache().context().snapshotMgr().listeners(); + + boolean disabled = lsnrs.disable(SnapshotConsistencyCheckOnRestore.class.getSimpleName()); + assertTrue(disabled); + } + + ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(); + + ignite.cache(DEFAULT_CACHE_NAME).destroy(); + + awaitPartitionMapExchange(); + + ensureCacheAbsent(dfltCacheCfg); + + for (Ignite g : G.allGrids()) + ((IgniteEx)g).context().cache().context().snapshotMgr().listeners().enable(SnapshotConsistencyCheckOnRestore.class.getSimpleName()); + + assertThrowsAnyCause( + log, + () -> ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(), + IgniteException.class, + "Snapshot data doesn't contain required cache group partition" + ); + } + + /** @throws Exception If fails. */ + @Test + public void testClusterSnapshotOptionalVerificationFailure() throws Exception { + String expMsg = "Test verification exception message."; + + extensions.add(new SnapshotLifecycleListener() { + @Override public String name() { + return "listener-1"; + } + + @Nullable @Override public Serializable beforeRestore(String name, @Nullable Collection grps, File metadata, + Path binaryDir, Path marshallerDir, Path cacheDir) throws IgniteCheckedException { + throw new IgniteCheckedException(expMsg); + } + }); + + extensions.add(new SnapshotLifecycleListener() { + @Override public String name() { + return "listener-2"; + } + + @Nullable @Override public Serializable beforeRestore(String name, @Nullable Collection grps, File metadata, + Path binaryDir, Path marshallerDir, Path cacheDir) throws IgniteCheckedException { + return null; + } + }); + + IgniteEx ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE); + + IgniteFuture fut = ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, null); + + GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), IgniteCheckedException.class, expMsg); + } + + /** @throws Exception If fails. */ + @Test + public void testClusterSnapshotListenerPriority() throws Exception { + AtomicReference checkCntr = new AtomicReference<>(0); + + List ints = IntStream.range(0, 100).boxed().collect(Collectors.toList()); + + Collections.shuffle(ints); + + for (int num : ints) { + extensions.add(new SnapshotLifecycleListener() { + @Override public int priority() { + return num; + } + + @Override public String name() { + return "listener-" + num; + } + + @Nullable @Override public Integer beforeRestore(String name, @Nullable Collection grps, File metadata, + Path binaryDir, Path marshallerDir, Path cacheDir) throws IgniteCheckedException { + assertEquals(checkCntr.get().intValue(), num); + + checkCntr.set(num + 1); + + return null; + } + }); + } + + IgniteEx ignite = startGridsWithSnapshot(1, CACHE_KEYS_RANGE); + + ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(); + } + + private static class SnapshotLifecyclePluginProvider extends AbstractTestPluginProvider { + private final List extensions; + + public SnapshotLifecyclePluginProvider(List extensions) { + this.extensions = extensions; + } + + @Override public String name() { + return "SnapshotVerifier"; + } + + /** {@inheritDoc} */ + @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) { + for (SnapshotLifecycleListener lsnr : extensions) + registry.registerExtension(SnapshotLifecycleListener.class, lsnr); + } + }; + + /** {@inheritDoc} */ + @Override protected Function valueBuilder() { + return Integer::new; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java index 18d02f5242b157..95970554e6e785 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java @@ -17,10 +17,22 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; +import java.io.File; +import java.util.Arrays; import java.util.function.Function; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.binary.BinaryObjectBuilder; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; +import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.testframework.GridTestUtils; /** * Snapshot restore test base. @@ -29,6 +41,13 @@ public abstract class IgniteClusterSnapshotRestoreBaseTest extends AbstractSnaps /** Cache value builder. */ protected abstract Function valueBuilder(); + protected final int PARTS_NUMBER = GridTestUtils.SF.apply(512); + + /** {@inheritDoc} */ + @Override protected CacheConfiguration txCacheConfig(CacheConfiguration ccfg) { + return super.txCacheConfig(ccfg).setAffinity(new RendezvousAffinityFunction(false, PARTS_NUMBER)); + } + /** * @param nodesCnt Nodes count. * @param keysCnt Number of keys to create. @@ -72,6 +91,38 @@ protected void assertCacheKeys(IgniteCache cache, int keysCnt) { assertEquals(valueBuilder().apply(i), cache.get(i)); } + /** + * @param ccfg Cache configuration. + * @throws IgniteCheckedException if failed. + */ + protected void ensureCacheAbsent(CacheConfiguration ccfg) throws IgniteCheckedException { + String cacheName = ccfg.getName(); + + for (Ignite ignite : G.allGrids()) { + GridKernalContext kctx = ((IgniteEx)ignite).context(); + + if (kctx.clientNode()) + continue; + + CacheGroupDescriptor desc = kctx.cache().cacheGroupDescriptors().get(CU.cacheId(cacheName)); + + assertNull("nodeId=" + kctx.localNodeId() + ", cache=" + cacheName, desc); + + boolean success = GridTestUtils.waitForCondition( + () -> !kctx.cache().context().snapshotMgr().isRestoring(), + TIMEOUT); + + assertTrue("The process has not finished on the node " + kctx.localNodeId(), success); + + File dir = ((FilePageStoreManager)kctx.cache().context().pageStore()).cacheWorkDir(ccfg); + + String errMsg = String.format("%s, dir=%s, exists=%b, files=%s", + ignite.name(), dir, dir.exists(), Arrays.toString(dir.list())); + + assertTrue(errMsg, !dir.exists() || dir.list().length == 0); + } + } + /** */ protected class BinaryValueBuilder implements Function { /** Binary type name. */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java index c4d6a7ee6f614f..78d4b04b24f8c7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.OpenOption; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; @@ -64,6 +65,11 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.plugin.AbstractTestPluginProvider; +import org.apache.ignite.plugin.ExtensionRegistry; +import org.apache.ignite.plugin.PluginConfiguration; +import org.apache.ignite.plugin.PluginContext; +import org.apache.ignite.plugin.PluginProvider; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.testframework.GridTestUtils; import org.jetbrains.annotations.Nullable; @@ -72,9 +78,11 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFileName; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.TMP_CACHE_DIR_PREFIX; import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE; import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START; +import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause; import static org.apache.ignite.testframework.GridTestUtils.runAsync; /** @@ -114,6 +122,32 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR return valBuilder; } + /** @throws Exception If fails. */ + @Test + public void testRestoreWithMissedPart() throws Exception { + IgniteEx ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE); + + Path part0 = U.searchFileRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(), + getPartitionFileName(0)); + + assertNotNull(part0); + assertTrue(part0.toString(), part0.toFile().exists()); + assertTrue(part0.toFile().delete()); + + assertThrowsAnyCause( + log, + () -> ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(), + IgniteException.class, + "Snapshot data doesn't contain required cache group partition" + ); + + ensureCacheAbsent(dfltCacheCfg); + + for (Ignite g : G.allGrids()) { + + } + } + /** @throws Exception If failed. */ @Test public void testRestoreAllGroups() throws Exception { @@ -712,36 +746,6 @@ private void checkClusterStateChange( assertCacheKeys(ignite.cache(cacheName), CACHE_KEYS_RANGE); } - /** - * @param ccfg Cache configuration. - * @throws IgniteCheckedException if failed. - */ - private void ensureCacheAbsent(CacheConfiguration ccfg) throws IgniteCheckedException { - String cacheName = ccfg.getName(); - - for (Ignite ignite : G.allGrids()) { - GridKernalContext kctx = ((IgniteEx)ignite).context(); - - if (kctx.clientNode()) - continue; - - CacheGroupDescriptor desc = kctx.cache().cacheGroupDescriptors().get(CU.cacheId(cacheName)); - - assertNull("nodeId=" + kctx.localNodeId() + ", cache=" + cacheName, desc); - - GridTestUtils.waitForCondition( - () -> !kctx.cache().context().snapshotMgr().isRestoring(), - TIMEOUT); - - File dir = ((FilePageStoreManager)kctx.cache().context().pageStore()).cacheWorkDir(ccfg); - - String errMsg = String.format("%s, dir=%s, exists=%b, files=%s", - ignite.name(), dir, dir.exists(), Arrays.toString(dir.list())); - - assertTrue(errMsg, !dir.exists() || dir.list().length == 0); - } - } - /** * @param spi Test communication spi. * @param restorePhase The type of distributed process on which communication is blocked.