Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
cf5d3fb
IGNITE-10058 Fix draft.
xtern Nov 14, 2018
9af319d
IGNITE-10058 Evict lost parts on non-affinity nodes after first excha…
xtern Nov 16, 2018
a2e8236
IGNITE-10058 Rework test timeouts (wip).
xtern Nov 16, 2018
5e2349c
IGNITE-10058 Improved test and check on rebalance.
xtern Nov 19, 2018
b3a264c
IGNITE-10058 Minor.
xtern Nov 19, 2018
9e1f3e0
IGNITE-10058 Minor codestyle.
xtern Nov 19, 2018
f4484fe
IGNITE-10058 Skip forceAffReassignment.
xtern Nov 20, 2018
872361a
IGNITE-10058 Review notes: removed redundant checking.
xtern Nov 29, 2018
945c3d8
IGNITE-10058 TC check.
xtern Dec 4, 2018
5ba7829
Revert "IGNITE-10058 TC check."
xtern Dec 4, 2018
e098747
IGNITE-10058 Revert rebalance changes.
xtern Dec 6, 2018
62a7a8d
IGNITE-10058 Fix reset lost parts (in-mem, empty counters).
xtern Dec 12, 2018
8695bd5
IGNITE-10058 Fix reset update counters (wip).
xtern Dec 12, 2018
4e5b226
IGNITE-10058 rework (wip).
xtern Dec 12, 2018
4b4cecf
IGNITE-10058 Reset counters only if owners exists.
xtern Dec 13, 2018
d83ec8f
IGNITE-10058 Rework test.
xtern Dec 13, 2018
6bdd03e
IGNITE-10058 Fix testReactivateGridBeforeResetLostPartitions (lostpar…
xtern Dec 14, 2018
f296972
IGNITE-10058 flaky failure (wip).
xtern Dec 14, 2018
b276763
flaky failure (wip)
xtern Dec 15, 2018
1157382
IGNITE-10058 Flaky test mass run.
xtern Dec 15, 2018
05cf838
debug
xtern Dec 16, 2018
069e2b6
wip
xtern Dec 17, 2018
a747894
Eviction race fix.
xtern Dec 17, 2018
f1d0138
wip
xtern Dec 17, 2018
7a22957
IGNITE-10058 wipwipwip
xtern Dec 17, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ public enum PartitionLossPolicy {
/**
* All reads and writes will proceed as if all partitions were in a consistent state. The result of reading
* from a lost partition is undefined and may be different on different nodes in the cluster.
*
* @deprecated Since 2.8. This policy can lead to weird scenarios that are impossible to solve with current
* architecture.
*/
@Deprecated
READ_WRITE_ALL,

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ private void onCacheGroupStopped(AffinityTopologyVersion topVer) {
* @param top Topology.
* @param checkGrpId Group ID.
*/
void checkRebalanceState(GridDhtPartitionTopology top, Integer checkGrpId) {
public void checkRebalanceState(GridDhtPartitionTopology top, Integer checkGrpId) {
CacheAffinityChangeMessage msg = null;

synchronized (mux) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1551,6 +1551,8 @@ void decrementSize(int cacheId) {

/** {@inheritDoc} */
@Override public void updateSize(int cacheId, long delta) {
// U.dumpStack("update size " + ctx.cache().cacheDescriptor(cacheId).cacheName() + "< delta = " + delta);

storageSize.addAndGet(delta);

if (grp.sharedGroup()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,14 @@ private void distributedExchange() throws IgniteCheckedException {

try {
if (crd.isLocal()) {
if (exchActions != null) {
Collection<String> caches = exchActions.cachesToResetLostPartitions();

// Reset lost partitions on coordinator before update cache topology from single messages.
if (!F.isEmpty(caches))
resetLostPartitions(caches);
}

if (remaining.isEmpty())
onAllReceived(null);
}
Expand Down Expand Up @@ -1805,6 +1813,8 @@ private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException
true);
}
else {
log.info("single message");

msg = cctx.exchange().createPartitionsSingleMessage(exchangeId(),
false,
true,
Expand Down Expand Up @@ -3245,10 +3255,11 @@ private void finishExchangeOnCoordinator(@Nullable Collection<ClusterNode> sndRe
if (exchActions != null) {
assignPartitionsStates();

Set<String> caches = exchActions.cachesToResetLostPartitions();
for (String cache : exchActions.cachesToResetLostPartitions()) {
GridCacheContext ctx = cctx.cacheContext(CU.cacheId(cache));

if (!F.isEmpty(caches))
resetLostPartitions(caches);
cctx.affinity().checkRebalanceState(ctx.topology(), ctx.groupId());
}
}
}
else if (discoveryCustomMessage instanceof SnapshotDiscoveryMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@
* Key partition.
*/
public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements Comparable<GridDhtLocalPartition>, GridReservable {
public static final ConcurrentMap<String, ConcurrentHashMap<Integer, StringBuilder>> MEGAMAP = new ConcurrentHashMap<>();

public void putState(GridDhtPartitionState oldState, GridDhtPartitionState newState) {

if (grp.hasCache("default")) {
String nodeId = ctx.localNodeId().toString();
String nodeIdx = nodeId.substring(nodeId.length() - 3);

MEGAMAP.computeIfAbsent(nodeIdx, (v) -> new ConcurrentHashMap<>()).computeIfAbsent(id, v -> new StringBuilder(oldState.toString())).append(" -> ").append(newState);
}
}

/** */
private static final GridCacheMapEntryFactory ENTRY_FACTORY = GridDhtCacheEntry::new;

Expand Down Expand Up @@ -559,6 +571,8 @@ private boolean casState(long state, GridDhtPartitionState toState) {
boolean update = this.state.compareAndSet(state, setPartState(state, toState));

if (update) {
putState(prevState, toState);

try {
ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, toState, updateCounter()));
}
Expand All @@ -582,6 +596,8 @@ private boolean casState(long state, GridDhtPartitionState toState) {
boolean update = this.state.compareAndSet(state, setPartState(state, toState));

if (update) {
putState(prevState, toState);

if (log.isDebugEnabled())
log.debug("Partition changed state [grp=" + grp.cacheOrGroupName()
+ ", p=" + id + ", prev=" + prevState + ", to=" + toState + "]");
Expand Down Expand Up @@ -665,6 +681,8 @@ public IgniteInternalFuture<?> rent(boolean updateSeq) {
delayedRenting = true;

if (getReservations(state0) == 0 && casState(state0, RENTING)) {
log.info(id() + " - " + RENTING + " clear async");

delayedRenting = false;

// Evict asynchronously, as the 'rent' method may be called
Expand Down Expand Up @@ -854,7 +872,7 @@ public void awaitDestroy() {
if (state() != EVICTED)
return;

final long timeout = 10_000;
final long timeout = 30_000;

for (;;) {
try {
Expand Down Expand Up @@ -1007,6 +1025,8 @@ public long updateCounter() {
* @param val Update counter value.
*/
public void updateCounter(long val) {
log.info(" update cntr " + id() + ", cntr=" + val);

store.updateCounter(val);
}

Expand All @@ -1031,7 +1051,12 @@ public void initialUpdateCounter(long val) {
* @return Update counter value before update.
*/
public long getAndIncrementUpdateCounter(long delta) {
return store.getAndIncrementUpdateCounter(delta);
long cntr = store.getAndIncrementUpdateCounter(delta);

// if (cntr == 0)
log.info(" update cntr " + id() + ", cntr=" + cntr);

return cntr;
}

/**
Expand All @@ -1041,6 +1066,9 @@ public long getAndIncrementUpdateCounter(long delta) {
* @param delta Delta.
*/
public void updateCounter(long start, long delta) {
// if (start == 0)
// log.info(" update cntr " + id() + ", start=" + start + ", delta=" + delta);

store.updateCounter(start, delta);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,9 @@ else if (loc != null && state == RENTING && !showRenting) {
if (part == null)
continue;

if (part.state() == RENTING)
log.info(">xxx> observing RENTING " + part.id());

map.put(i, part.state());
}

Expand Down Expand Up @@ -2127,13 +2130,37 @@ else if (plc != PartitionLossPolicy.IGNORE) {
lock.writeLock().lock();

try {
// LOST partitions that has at least one owner.
Set<Integer> hasOwner = new HashSet<>();

for (GridDhtLocalPartition part : localPartitions()) {
if (part.state() != LOST)
continue;

for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
if (e.getValue().get(part.id()) != OWNING)
continue;

assert !ctx.localNodeId().equals(e.getKey());

hasOwner.add(part.id());

break;
}
}

long updSeq = updateSeq.incrementAndGet();

for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) {
if (e0.getValue() != LOST)
continue;

// AffinityAssignment assignment = grp.affinity().cachedAffinity(resTopVer);
//
// if (!assignment.idealAssignment().get(e0.getKey()).contains(ctx.discovery().node(e.getKey())))
// continue;

e0.setValue(OWNING);

GridDhtLocalPartition locPart = localPartition(e0.getKey(), resTopVer, false);
Expand All @@ -2146,8 +2173,17 @@ else if (plc != PartitionLossPolicy.IGNORE) {

long updateCntr = locPart.updateCounter();

//Set update counters to 0, for full rebalance.
locPart.updateCounter(updateCntr, -updateCntr);
if (hasOwner.contains(locPart.id())) {
// Set update counters to 0, for full rebalance.
log.info(">xxx> " + grp.cacheOrGroupName() + " there is owner for " + locPart.id() + " reset counter 0");

locPart.updateCounter(updateCntr, -updateCntr);
}
else
log.info(">xxx> " + grp.cacheOrGroupName() + " there is no owner for " + locPart.id() + ", keeping update cntr = " + updateCntr);

assert locPart.initialUpdateCounter() == 0 : locPart.initialUpdateCounter();

locPart.initialUpdateCounter(0);
}
}
Expand Down Expand Up @@ -2260,6 +2296,19 @@ else if (plc != PartitionLossPolicy.IGNORE) {
}
}

// if (lostParts != null) {
// for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
// if (e.getKey().equals(ctx.localNodeId()))
// continue;
//
// for (Integer part : lostParts) {
// GridDhtPartitionState state = e.getValue().get(part);
// if (state != null && state.active())
// e.getValue().put(part, LOST);
// }
// }
// }

node2part = new GridDhtPartitionFullMap(node2part, updateSeq.incrementAndGet());
} finally {
lock.writeLock().unlock();
Expand Down Expand Up @@ -2344,7 +2393,7 @@ private boolean checkEvictions(long updateSeq, AffinityAssignment aff) {
if (nodeIds.containsAll(F.nodeIds(affNodes))) {
GridDhtPartitionState state0 = part.state();

IgniteInternalFuture<?> rentFut = part.rent(false);
IgniteInternalFuture<?> rentFut = part.rent(true);

rentingFutures.add(rentFut);

Expand Down Expand Up @@ -2375,7 +2424,7 @@ private boolean checkEvictions(long updateSeq, AffinityAssignment aff) {
if (locId.equals(n.id())) {
GridDhtPartitionState state0 = part.state();

IgniteInternalFuture<?> rentFut = part.rent(false);
IgniteInternalFuture<?> rentFut = part.rent(true);

rentingFutures.add(rentFut);

Expand Down Expand Up @@ -2415,6 +2464,7 @@ private boolean checkEvictions(long updateSeq, AffinityAssignment aff) {
log.debug("Partitions have been scheduled to resend [reason=" +
"Evictions are done [grp" + grp.cacheOrGroupName() + "]");

log.info("schedule resend");
ctx.exchange().scheduleResendPartitions();
}
finally {
Expand Down
Loading