Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ public void end(int flags) throws XAException {
} finally {
this.suspended = suspended;
this.ended = ended;
this.started = false;
if (ended) {
this.started = false;
}
}
}

Expand Down
27 changes: 5 additions & 22 deletions btm/src/main/java/bitronix/tm/internal/XAResourceManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void suspend() throws XAException {
for (XAResourceHolderState xaResourceHolderState : resources) {
if (!xaResourceHolderState.isEnded()) {
if (log.isDebugEnabled()) log.debug("suspending " + xaResourceHolderState);
xaResourceHolderState.end(XAResource.TMSUCCESS);
xaResourceHolderState.end(XAResource.TMSUSPEND);
}
} // while
}
Expand All @@ -156,29 +156,12 @@ public void suspend() throws XAException {
* @throws XAException if the resource threw an exception during resume.
*/
public void resume() throws XAException {
// all XAResource needs to be re-enlisted but this must happen
// outside the Scheduler's iteration as enlist() can change the
// collection's content and confuse the iterator.
List<XAResourceHolderState> toBeReEnlisted = new ArrayList<XAResourceHolderState>();

for (XAResourceHolderState xaResourceHolderState : resources) {
if (log.isDebugEnabled()) log.debug("resuming " + xaResourceHolderState);

// If a prepared statement is (re-)used after suspend/resume is performed its XAResource needs to be
// re-enlisted. This must be done outside this loop or that will confuse the iterator!
toBeReEnlisted.add(new XAResourceHolderState(xaResourceHolderState));
}

if (toBeReEnlisted.size() > 0 && log.isDebugEnabled()) log.debug("re-enlisting " + toBeReEnlisted.size() + " resource(s)");
for (XAResourceHolderState xaResourceHolderState : toBeReEnlisted) {
if (log.isDebugEnabled()) log.debug("re-enlisting resource " + xaResourceHolderState);
try {
enlist(xaResourceHolderState);
xaResourceHolderState.getXAResourceHolder().putXAResourceHolderState(xaResourceHolderState.getXid(), xaResourceHolderState);
} catch (BitronixSystemException ex) {
throw new BitronixXAException("error re-enlisting resource during resume: " + xaResourceHolderState, XAException.XAER_RMERR, ex);
if (!xaResourceHolderState.isEnded()) {
if (log.isDebugEnabled()) log.debug("resuming " + xaResourceHolderState);
xaResourceHolderState.start(XAResource.TMRESUME);
}
}
} // while
}

/**
Expand Down
16 changes: 16 additions & 0 deletions btm/src/main/java/bitronix/tm/resource/jdbc/lrc/LrcXAResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ public boolean isSameRM(XAResource xaResource) throws XAException {
}

public void start(Xid xid, int flag) throws XAException {
if (flag == XAResource.TMRESUME) {
if (state != STARTED) {
throw new BitronixXAException("resource not started on XID " + this.xid, XAException.XAER_PROTO);
}
log.warn("Resume not supported, maybe some commits from the suspend state on XID " + xid);
// Do not throw exception as this is just emulating XA
return;
}
if (flag != XAResource.TMNOFLAGS && flag != XAResource.TMJOIN)
throw new BitronixXAException("unsupported start flag " + Decoder.decodeXAResourceFlag(flag), XAException.XAER_RMERR);
if (xid == null)
Expand Down Expand Up @@ -161,6 +169,14 @@ else if (state == PREPARED) {
}

public void end(Xid xid, int flag) throws XAException {
if (flag == XAResource.TMSUSPEND) {
if (state != STARTED) {
throw new BitronixXAException("resource not started on XID " + this.xid, XAException.XAER_PROTO);
}
log.warn("Suspend not supported on XID " + xid + " keep started state.");
// Do not throw exception as this is just emulating XA
return;
}
if (flag != XAResource.TMSUCCESS && flag != XAResource.TMFAIL)
throw new BitronixXAException("unsupported end flag " + Decoder.decodeXAResourceFlag(flag), XAException.XAER_RMERR);
if (xid == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicReference;

import javax.transaction.*;

Expand All @@ -47,17 +48,19 @@ public void testSharedConnectionMultithreaded() throws Exception {
tm.begin();
if (log.isDebugEnabled()) log.debug("*** after begin");

final Transaction suspended = tm.suspend();
final AtomicReference<Transaction> suspended = new AtomicReference<Transaction>(tm.suspend());

final ArrayList twoConnections = new ArrayList();
Thread thread1 = new Thread() {
public void run() {
try {
tm.resume(suspended);
tm.resume(suspended.get());
if (log.isDebugEnabled()) log.debug("*** getting connection from DS1");
Connection connection = poolingDataSource1.getConnection();
connection.createStatement();
twoConnections.add(connection);
// remove transaction from the ThreadLocal
suspended.set(tm.suspend());
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
Expand All @@ -70,7 +73,7 @@ public void run() {
Thread thread2 = new Thread() {
public void run() {
try {
tm.resume(suspended);
tm.resume(suspended.get());
if (log.isDebugEnabled()) log.debug("*** getting connection from DS1");
Connection connection = poolingDataSource1.getConnection();
connection.createStatement();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,19 +590,16 @@ public void testSuspendResume() throws Exception {
List orderedEvents = EventRecorder.getOrderedEvents();
log.info(EventRecorder.dumpToString());

assertEquals(23, orderedEvents.size());
int i=0;
assertEquals(Status.STATUS_ACTIVE, ((JournalLogEvent) orderedEvents.get(i++)).getStatus());
assertEquals(DATASOURCE1_NAME, ((ConnectionDequeuedEvent) orderedEvents.get(i++)).getPooledConnectionImpl().getPoolingDataSource().getUniqueName());
assertEquals(XAResource.TMNOFLAGS, ((XAResourceStartEvent) orderedEvents.get(i++)).getFlag());
assertEquals(DATASOURCE2_NAME, ((ConnectionDequeuedEvent) orderedEvents.get(i++)).getPooledConnectionImpl().getPoolingDataSource().getUniqueName());
assertEquals(XAResource.TMNOFLAGS, ((XAResourceStartEvent) orderedEvents.get(i++)).getFlag());
assertEquals(XAResource.TMSUCCESS, ((XAResourceEndEvent) orderedEvents.get(i++)).getFlag());
assertEquals(XAResource.TMSUCCESS, ((XAResourceEndEvent) orderedEvents.get(i++)).getFlag());
assertEquals(true, ((XAResourceIsSameRmEvent) orderedEvents.get(i++)).isSameRm());
assertEquals(XAResource.TMJOIN, ((XAResourceStartEvent) orderedEvents.get(i++)).getFlag());
assertEquals(true, ((XAResourceIsSameRmEvent) orderedEvents.get(i++)).isSameRm());
assertEquals(XAResource.TMJOIN, ((XAResourceStartEvent) orderedEvents.get(i++)).getFlag());
assertEquals(XAResource.TMSUSPEND, ((XAResourceEndEvent) orderedEvents.get(i++)).getFlag());
assertEquals(XAResource.TMSUSPEND, ((XAResourceEndEvent) orderedEvents.get(i++)).getFlag());
assertEquals(XAResource.TMRESUME, ((XAResourceStartEvent) orderedEvents.get(i++)).getFlag());
assertEquals(XAResource.TMRESUME, ((XAResourceStartEvent) orderedEvents.get(i++)).getFlag());
assertEquals(XAResource.TMSUCCESS, ((XAResourceEndEvent) orderedEvents.get(i++)).getFlag());
assertEquals(XAResource.TMSUCCESS, ((XAResourceEndEvent) orderedEvents.get(i++)).getFlag());
assertEquals(Status.STATUS_PREPARING, ((JournalLogEvent) orderedEvents.get(i++)).getStatus());
Expand All @@ -615,6 +612,8 @@ public void testSuspendResume() throws Exception {
assertEquals(Status.STATUS_COMMITTED, ((JournalLogEvent) orderedEvents.get(i++)).getStatus());
assertEquals(DATASOURCE1_NAME, ((ConnectionQueuedEvent) orderedEvents.get(i++)).getPooledConnectionImpl().getPoolingDataSource().getUniqueName());
assertEquals(DATASOURCE2_NAME, ((ConnectionQueuedEvent) orderedEvents.get(i++)).getPooledConnectionImpl().getPoolingDataSource().getUniqueName());

assertEquals(21, orderedEvents.size());
}

public void testLooseWorkingCaseOutsideOutside() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,21 +301,19 @@ public void testClosingSuspendedConnectionsInDifferentContext() throws Exception
List orderedEvents = EventRecorder.getOrderedEvents();
log.info(EventRecorder.dumpToString());

assertEquals(18, orderedEvents.size());
int i=0;
assertEquals(Status.STATUS_ACTIVE, ((JournalLogEvent) orderedEvents.get(i++)).getStatus());
assertEquals(DATASOURCE1_NAME, ((ConnectionDequeuedEvent) orderedEvents.get(i++)).getPooledConnectionImpl().getPoolingDataSource().getUniqueName());
assertEquals(XAResource.TMNOFLAGS, ((XAResourceStartEvent) orderedEvents.get(i++)).getFlag());
assertEquals(XAResource.TMSUCCESS, ((XAResourceEndEvent) orderedEvents.get(i++)).getFlag());
assertEquals(XAResource.TMSUSPEND, ((XAResourceEndEvent) orderedEvents.get(i++)).getFlag());

assertEquals(Status.STATUS_ACTIVE, ((JournalLogEvent) orderedEvents.get(i++)).getStatus());
assertEquals(Status.STATUS_PREPARING, ((JournalLogEvent) orderedEvents.get(i++)).getStatus());
assertEquals(Status.STATUS_PREPARED, ((JournalLogEvent) orderedEvents.get(i++)).getStatus());
assertEquals(Status.STATUS_COMMITTING, ((JournalLogEvent) orderedEvents.get(i++)).getStatus());
assertEquals(Status.STATUS_COMMITTED, ((JournalLogEvent) orderedEvents.get(i++)).getStatus());

assertEquals(true, ((XAResourceIsSameRmEvent) orderedEvents.get(i++)).isSameRm());
assertEquals(XAResource.TMJOIN, ((XAResourceStartEvent) orderedEvents.get(i++)).getFlag());
assertEquals(XAResource.TMRESUME, ((XAResourceStartEvent) orderedEvents.get(i++)).getFlag());
assertEquals(XAResource.TMSUCCESS, ((XAResourceEndEvent) orderedEvents.get(i++)).getFlag());

assertEquals(Status.STATUS_PREPARING, ((JournalLogEvent) orderedEvents.get(i++)).getStatus());
Expand All @@ -324,6 +322,8 @@ public void testClosingSuspendedConnectionsInDifferentContext() throws Exception
assertEquals(true, ((XAResourceCommitEvent) orderedEvents.get(i++)).isOnePhase());
assertEquals(Status.STATUS_COMMITTED, ((JournalLogEvent) orderedEvents.get(i++)).getStatus());
assertEquals(DATASOURCE1_NAME, ((ConnectionQueuedEvent) orderedEvents.get(i++)).getPooledConnectionImpl().getPoolingDataSource().getUniqueName());

assertEquals(17, orderedEvents.size());
}

}
Loading