Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotListeners;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
Expand Down Expand Up @@ -268,6 +270,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
/** Distributed process to restore cache group from the snapshot. */
private final SnapshotRestoreProcess restoreCacheGrpProc;

// todo something like pipeline
private final SnapshotListeners snpLsnrs = new SnapshotListeners();

/** Resolved persistent data storage settings. */
private volatile PdsFolderSettings pdsSettings;

Expand Down Expand Up @@ -381,6 +386,15 @@ public static String partDeltaFileName(int partId) {
U.ensureDirectory(locSnpDir, "snapshot work directory", log);
U.ensureDirectory(tmpWorkDir, "temp directory for snapshot creation", log);

snpLsnrs.register(new SnapshotConsistencyCheckOnRestore(ctx));

SnapshotLifecycleListener[] lsnrs = ctx.plugins().extensions(SnapshotLifecycleListener.class);

if (lsnrs != null) {
for (SnapshotLifecycleListener lsnr : lsnrs)
snpLsnrs.register(lsnr);
}

MetricRegistry mreg = cctx.kernalContext().metric().registry(SNAPSHOT_METRICS);

mreg.register("LastSnapshotStartTime", () -> lastSeenSnpFut.startTime,
Expand Down Expand Up @@ -538,6 +552,10 @@ public void deleteSnapshot(File snpDir, String folderName) {
}
}

public SnapshotListeners listeners() {
return snpLsnrs;
}

/**
* @param snpName Snapshot name.
* @return Local snapshot directory for snapshot with given name.
Expand Down Expand Up @@ -597,6 +615,13 @@ private IgniteInternalFuture<SnapshotOperationResponse> initLocalSnapshotStartSt
"prior to snapshot operation start: " + leftNodes));
}

// try {
// for (SnapshotLifecycleListener lsnr : lifecycleListeners)
// lsnr.beforeCreateSnapshot(cctx.localNode(), req);
// } catch (IgniteCheckedException e) {
// return new GridFinishedFuture<>(e);
// }

List<Integer> grpIds = new ArrayList<>(F.viewReadOnly(req.groups(), CU::cacheId));

Set<Integer> leftGrps = new HashSet<>(grpIds);
Expand Down Expand Up @@ -669,6 +694,15 @@ private IgniteInternalFuture<SnapshotOperationResponse> initLocalSnapshotStartSt
log.info("Snapshot metafile has been created: " + smf.getAbsolutePath());
}

File snpDir = snapshotLocalDir(req.snapshotName());

Path binaryWorkDir = binaryWorkDir(snpDir.getAbsolutePath(), pdsSettings.folderName()).toPath();
Path marshallerDir = mappingFileStoreWorkDir(snpDir.getAbsolutePath()).toPath();
Path cacheDir = Paths.get(snpDir.toString(), DB_DEFAULT_FOLDER, pdsSettings.folderName());

for (SnapshotLifecycleListener<?> lsnr : listeners().list())
lsnr.afterCreate(req.snapshotName(), smf, binaryWorkDir, marshallerDir, cacheDir);

return new SnapshotOperationResponse();
}
catch (IOException | IgniteCheckedException e) {
Expand Down Expand Up @@ -960,6 +994,37 @@ public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name) {
});
}

/**
* @param name Snapshot name.
* @return Future with snapshot metadata obtained from nodes.
*/
IgniteInternalFuture<Map<ClusterNode, List<SnapshotMetadata>>> collectSnapshotMetadata(String name) {
GridKernalContext kctx0 = cctx.kernalContext();

kctx0.security().authorize(ADMIN_SNAPSHOT);

Collection<ClusterNode> bltNodes = F.view(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
(node) -> CU.baselineNode(node, kctx0.state().clusterState()));

kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
kctx0.task().setThreadContext(TC_SUBGRID, bltNodes);

return kctx0.task().execute(SnapshotMetadataCollectorTask.class, name);
}

// /**
// * @param metas Nodes snapshot metadata.
// * @return Future with the verification results.
// */
// IgniteInternalFuture<IdleVerifyResultV2> runSnapshotVerification(Map<ClusterNode, List<SnapshotMetadata>> metas) {
// GridKernalContext kctx0 = cctx.kernalContext();
//
// kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
// kctx0.task().setThreadContext(TC_SUBGRID, new ArrayList<>(metas.keySet()));
//
// return kctx0.task().execute(SnapshotPartitionsVerifyTask.class, metas);
// }

/**
* The check snapshot procedure performs compute operation over the whole cluster to verify the snapshot
* entirety and partitions consistency. The result future will be completed with an exception if this
Expand Down Expand Up @@ -1069,7 +1134,7 @@ public SnapshotMetadata readSnapshotMetadata(String snpName, String consId) {
* @param smf File denoting to snapshot metafile.
* @return Snapshot metadata instance.
*/
private SnapshotMetadata readSnapshotMetadata(File smf) {
public SnapshotMetadata readSnapshotMetadata(File smf) {
if (!smf.exists())
throw new IgniteException("Snapshot metafile cannot be read due to it doesn't exist: " + smf);

Expand Down Expand Up @@ -1358,7 +1423,7 @@ public void onCacheGroupsStopped(List<Integer> grps) {
* @param consId Consistent node id.
* @return Snapshot metadata file name.
*/
private static String snapshotMetaFileName(String consId) {
public static String snapshotMetaFileName(String consId) {
return U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitionFiles;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.calculatePartitionHash;
import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum;

public class SnapshotConsistencyCheckOnRestore implements SnapshotLifecycleListener<HashMap<PartitionKeyV2, PartitionHashRecordV2>> {
/** */
private final GridKernalContext ctx;

/** */
private final IgniteLogger log;

@Override public int priority() {
return Integer.MIN_VALUE;
}

public SnapshotConsistencyCheckOnRestore(GridKernalContext ctx) {
this.ctx = ctx;

log = ctx.log(getClass());
}

/** {@inheritDoc} */
@Override public HashMap<PartitionKeyV2, PartitionHashRecordV2> beforeRestore(
String name,
@Nullable Collection<String> rqGrps,
File metadata,
Path binaryDir,
Path marshallerDir,
Path cacheDir
) throws IgniteCheckedException {
IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();

SnapshotMetadata meta = snpMgr.readSnapshotMetadata(metadata);

if (log.isInfoEnabled()) {
log.info("Verify snapshot partitions procedure has been initiated " +
"[snpName=" + name + ", consId=" + meta.consistentId() + ']');
}
Set<Integer> grps = F.isEmpty(rqGrps) ? new HashSet<>(meta.partitions().keySet()) :
rqGrps.stream().map(CU::cacheId).collect(Collectors.toSet());
Set<File> partFiles = new HashSet<>();

for (File dir : snpMgr.snapshotCacheDirectories(name, meta.folderName())) {
int grpId = CU.cacheId(cacheGroupName(dir));

if (!grps.remove(grpId))
continue;

Set<Integer> parts = new HashSet<>(meta.partitions().get(grpId));

for (File part : cachePartitionFiles(dir)) {
int partId = partId(part.getName());

if (!parts.remove(partId))
continue;

partFiles.add(part);
}

if (!parts.isEmpty()) {
throw new IgniteException("Snapshot data doesn't contain required cache group partition " +
"[grpId=" + grpId + ", snpName=" + name + ", consId=" + meta.consistentId() +
", missed=" + parts + ", meta=" + meta + ']');
}
}

if (!grps.isEmpty()) {
throw new IgniteException("Snapshot data doesn't contain required cache groups " +
"[grps=" + grps + ", snpName=" + name + ", consId=" + meta.consistentId() +
", meta=" + meta + ']');
}

Map<PartitionKeyV2, PartitionHashRecordV2> res = new ConcurrentHashMap<>();
ThreadLocal<ByteBuffer> buff = ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(meta.pageSize())
.order(ByteOrder.nativeOrder()));

// try {
GridKernalContext snpCtx = snpMgr.createStandaloneKernalContext(name, meta.folderName());

for (GridComponent comp : snpCtx)
comp.start();

try {
U.doInParallel(
snpMgr.snapshotExecutorService(),
partFiles,
part -> {
String grpName = cacheGroupName(part.getParentFile());
int grpId = CU.cacheId(grpName);
int partId = partId(part.getName());

FilePageStoreManager storeMgr = (FilePageStoreManager)ctx.cache().context().pageStore();

try (FilePageStore pageStore = (FilePageStore)storeMgr.getPageStoreFactory(grpId, false)
.createPageStore(getTypeByPartId(partId),
part::toPath,
val -> {
})
) {
if (partId == INDEX_PARTITION) {
checkPartitionsPageCrcSum(() -> pageStore, INDEX_PARTITION, FLAG_IDX);

return null;
}

if (grpId == MetaStorage.METASTORAGE_CACHE_ID) {
checkPartitionsPageCrcSum(() -> pageStore, partId, FLAG_DATA);

return null;
}

ByteBuffer pageBuff = buff.get();
pageBuff.clear();
pageStore.read(0, pageBuff, true);

long pageAddr = GridUnsafe.bufferAddress(pageBuff);

PagePartitionMetaIO io = PageIO.getPageIO(pageBuff);
GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageAddr));

if (partState != OWNING) {
throw new IgniteCheckedException("Snapshot partitions must be in the OWNING " +
"state only: " + partState);
}

long updateCntr = io.getUpdateCounter(pageAddr);
long size = io.getSize(pageAddr);

if (log.isDebugEnabled()) {
log.debug("Partition [grpId=" + grpId
+ ", id=" + partId
+ ", counter=" + updateCntr
+ ", size=" + size + "]");
}

// Snapshot partitions must always be in OWNING state.
// There is no `primary` partitions for snapshot.
PartitionKeyV2 key = new PartitionKeyV2(grpId, partId, grpName);

PartitionHashRecordV2 hash = calculatePartitionHash(key,
updateCntr,
meta.consistentId(),
GridDhtPartitionState.OWNING,
false,
size,
snpMgr.partitionRowIterator(snpCtx, grpName, partId, pageStore));

assert hash != null : "OWNING must have hash: " + key;

res.put(key, hash);
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}

return null;
}
);
}
finally {
for (GridComponent comp : snpCtx)
comp.stop(true);
}
// }
// catch (IgniteCheckedException e) {
// throw new IgniteException(e);
// }

return new HashMap<>(res);
}

/** {@inheritDoc} */
@Override public void handlePreRestoreResults(String name, @Nullable Collection<String> grps,
Map<ClusterNode, HashMap<PartitionKeyV2, PartitionHashRecordV2>> res,
Map<ClusterNode, Exception> errs) throws IgniteCheckedException {
Map<PartitionKeyV2, List<PartitionHashRecordV2>> clusterHashes = new HashMap<>();

for (Map<PartitionKeyV2, PartitionHashRecordV2> nodeHashes : res.values()) {
for (Map.Entry<PartitionKeyV2, PartitionHashRecordV2> e : nodeHashes.entrySet()) {
List<PartitionHashRecordV2> records = clusterHashes.computeIfAbsent(e.getKey(), k -> new ArrayList<>());

records.add(e.getValue());
}
}

IdleVerifyResultV2 verifyResult = new IdleVerifyResultV2(clusterHashes, errs);

if (!F.isEmpty(errs) || verifyResult.hasConflicts()) {
StringBuilder sb = new StringBuilder();

verifyResult.print(sb::append, true);

throw new IgniteCheckedException(sb.toString());
}
}
}
Loading