Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1549,7 +1549,7 @@ private GridCacheGateway<K, V> checkProxyIsValid(@Nullable GridCacheGateway<K, V
* @return Previous projection set on this thread.
*/
private CacheOperationGate onEnter() {
GridCacheGateway<K, V> gate = checkProxyIsValid(gate(), true);
GridCacheGateway<K, V> gate = checkProxyIsValid(gate(), context().localNode().isClient());

return new CacheOperationGate(gate,
lock ? gate.enter(opCtx) : gate.enterNoLock(opCtx));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1116,15 +1116,23 @@ private boolean onSend(GridCacheMessage msg, @Nullable UUID destNodeId) throws I
if (msg.error() != null && cctx.kernalContext().isStopping())
return false;

if (msg.messageId() < 0)
// Generate and set message ID.
msg.messageId(idGen.incrementAndGet());
asdas

if (destNodeId == null || !cctx.localNodeId().equals(destNodeId)) {
msg.prepareMarshal(cctx);
try {
if (msg.messageId() < 0)
// Generate and set message ID.
msg.messageId(idGen.incrementAndGet());

if (destNodeId == null || !cctx.localNodeId().equals(destNodeId)) {
msg.prepareMarshal(cctx);

if (msg instanceof GridCacheDeployable && msg.addDeploymentInfo())
cctx.deploy().prepare((GridCacheDeployable)msg);
if (msg instanceof GridCacheDeployable && msg.addDeploymentInfo())
cctx.deploy().prepare((GridCacheDeployable)msg);
}
} catch (RuntimeException e) {
log.error(e.getMessage(), e);//.printStackTrace();

return false;
}

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1757,7 +1757,7 @@ private void updateAllAsyncInternal0(
top.readLock();

try {
if (top.stopping()) {
if (top.stopping() || context().gate().isStopped()) {
res.addFailedKeys(req.keys(), new CacheStoppedException(name()));

completionCb.apply(req, res);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicU
@Override protected void mapOnTopology() {
AffinityTopologyVersion topVer;

if (cache.topology().stopping()) {
if (cache.topology().stopping() || cache.context().gate().isStopped()) {
completeFuture(null,new CacheStoppedException(
cache.name()),
null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicU
@Override protected void mapOnTopology() {
AffinityTopologyVersion topVer;

if (cache.topology().stopping()) {
if (cache.topology().stopping() || cache.context().gate().isStopped()) {
completeFuture(null,new CacheStoppedException(cache.name()), null);

return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ private void mapOnTopology(final boolean remap, @Nullable final Runnable c) {
cctx.topology().readLock();

try {
if (cctx.topology().stopping()) {
if (cctx.topology().stopping() || cctx.gate().isStopped()) {
onDone(new CacheStoppedException(cctx.name()));

return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ synchronized void mapOnTopology(final boolean remap) {
cctx.topology().readLock();

try {
if (cctx.topology().stopping()) {
if (cctx.topology().stopping() || cctx.gate().isStopped()) {
onDone(new CacheStoppedException(cctx.name()));

return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ private void mapOnTopology() {
cctx.topology().readLock();

try {
if (cctx.topology().stopping()) {
if (cctx.topology().stopping() || cctx.gate().isStopped()) {
onDone(new CacheStoppedException(cctx.name()));

return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private void acquireTopologyVersion() {
cctx.topology().readLock();

try {
if (cctx.topology().stopping()) {
if (cctx.topology().stopping() || cctx.gate().isStopped()) {
onDone(new CacheStoppedException(cctx.name()));

return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {

cacheCtx.topology().readLock();

if (cacheCtx.topology().stopping()) {
if (cacheCtx.topology().stopping() || cacheCtx.gate().isStopped()) {
fut.onDone(new CacheStoppedException(cacheCtx.name()));

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {

nonLocCtx.topology().readLock();

if (nonLocCtx.topology().stopping()) {
if (nonLocCtx.topology().stopping() || nonLocCtx.gate().isStopped()) {
fut.onDone(new CacheStoppedException(nonLocCtx.name()));

return null;
Expand Down
59 changes: 59 additions & 0 deletions modules/core/src/test/java/org/apache/ignite/DestroyCacheTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.apache.ignite;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;


public class DestroyCacheTest extends GridCommonAbstractTest {

private CacheConfiguration<Integer, Boolean> ccfg(String name, String grp) {
return new CacheConfiguration<Integer, Boolean>(name).setCacheMode(CacheMode.PARTITIONED)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
.setGroupName(grp);
}

public void testDestroyAsync() throws Exception {
String grpName = "testGroup";

try (IgniteEx node = startGrid(0)) {
node.createCache(ccfg("cache2", grpName));

for (int n = 0; n < 100; n++) {
IgniteCache<Integer, Boolean> cache1 = node.createCache(ccfg("cache1", grpName));

AtomicInteger cntr = new AtomicInteger();

GridTestUtils.runMultiThreadedAsync(() -> {
try {
int key;

while ((key = cntr.getAndIncrement()) < 10_000) {
if (key == 1000)
cache1.destroy();

cache1.putIfAbsent(key, true);
}
}
catch (Exception ignore) {
log.warning(ignore.getMessage());
}

return null;
}, 6, "put-thread").get();
}

// System.out.println("HANGED THREADS: " + acq.size());
//
// for (Thread t : acq.values())
// U.dumpStack(t);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache;

import org.apache.ignite.cache.CacheAtomicityMode;

/**
* Test ensures that the put operation does not hang during asynchronous cache destroy.
*/
public abstract class IgniteCacheAtomicPutOnDestroyTest extends IgniteCachePutOnDestroyTest {
/** {@inheritDoc} */
@Override public CacheAtomicityMode atomicityMode() {
return CacheAtomicityMode.ATOMIC;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;

/**
* Test ensures that the put operation does not hang during asynchronous cache destroy.
*/
public abstract class IgniteCachePutOnDestroyTest extends GridCommonAbstractTest {
/** Iteration count. */
protected static final int ITER_CNT = 50;

/** Grid count. */
private static final int GRID_CNT = 2;

/** Worker threads timeout. */
private static final int TIMEOUT = 10_000;

/**
* @param cacheName Cache name.
* @return Cache configuration.
*/
protected <K, V> CacheConfiguration<K, V> cacheConfiguration(String cacheName, String grpName) {
CacheConfiguration<K, V> cfg = new CacheConfiguration<>();

cfg.setName(cacheName);
cfg.setGroupName(grpName);
cfg.setAtomicityMode(atomicityMode());

return cfg;
}

/**
* @return Cache atomicity mode.
*/
protected abstract CacheAtomicityMode atomicityMode();

/**
* @return Cache mode.
*/
protected abstract CacheMode cacheMode();

/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
startGrids(GRID_CNT);
}

/**
* @throws IgniteCheckedException If failed.
*/
public void testPutOnCacheDestroy() throws IgniteCheckedException {
for (int n = 0; n < ITER_CNT; n++)
doTestPutOnCacheDestroy(null, null);
}

/**
* @param concurrency Transaction concurrency level.
* @param isolation Transaction isolation level.
* @throws IgniteCheckedException If failed.
*/
protected void doTestPutOnCacheDestroy(TransactionConcurrency concurrency,
TransactionIsolation isolation) throws IgniteCheckedException {
String grpName = "testGroup";

boolean explicitTx = concurrency != null && isolation != null;

Ignite ignite = grid(0);

IgniteCache additionalCache = ignite.createCache(cacheConfiguration("cache1", grpName));

try {
IgniteCache<Integer, Boolean> cache = ignite.getOrCreateCache(cacheConfiguration("cache2", grpName));

AtomicInteger cntr = new AtomicInteger();

GridTestUtils.runMultiThreadedAsync(() -> {
try {
int key;

while ((key = cntr.getAndIncrement()) < 2_000) {
if (key == 1_000) {
cache.destroy();

break;
}

if (explicitTx) {
try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
cache.put(key, true);

tx.commit();
}
}
else
cache.put(key, true);
}
}
catch (Exception e) {
if (!hasCacheStoppedMessage(e)) {

//assertTrue(X.getFullStackTrace(e), );
fail("fuck off");
}
}

return null;
}, 6, "put-thread").get(TIMEOUT);
}
finally {
additionalCache.destroy();
}
}

/**
* Validate exception.
*
* @param e Exception.
* @return {@code True} if exception (or cause) is instance of {@link CacheStoppedException} or
* {@link IllegalStateException} and message contains "cache" and "stopped" keywords.
*/
private boolean hasCacheStoppedMessage(Throwable e) {
for (Throwable t : X.getThrowableList(e)) {
if (t.getClass() == CacheStoppedException.class || t.getClass() == IllegalStateException.class) {
String errMsg = t.getMessage().toLowerCase();

if (errMsg.contains("cache") && errMsg.contains("stopped"))
return true;

// for (Throwable t0 : X.getSuppressedList(t))
// if (hasCacheStoppedMessage(t0))
// return true;
}
}

for (Throwable t : X.getSuppressedList(e)) {
if (t.getClass() == CacheStoppedException.class || t.getClass() == IllegalStateException.class) {
String errMsg = t.getMessage().toLowerCase();

if (errMsg.contains("cache") && errMsg.contains("stopped"))
return true;

// for (Throwable t0 : X.getSuppressedList(t))
// if (hasCacheStoppedMessage(t0))
// return true;
}
}

return false;
}
}
Loading