diff --git a/.gitignore b/.gitignore index c66cfe5..fe93c2a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ testFile test_gen_files +xid.log +metadata.db # IntelliJ IDEA .idea diff --git a/src/main/java/net/kaaass/rumbase/record/MvccRecordStorage.java b/src/main/java/net/kaaass/rumbase/record/MvccRecordStorage.java index 29a6a0a..6f3ebc7 100644 --- a/src/main/java/net/kaaass/rumbase/record/MvccRecordStorage.java +++ b/src/main/java/net/kaaass/rumbase/record/MvccRecordStorage.java @@ -14,6 +14,7 @@ import net.kaaass.rumbase.transaction.TransactionIsolation; import net.kaaass.rumbase.transaction.TransactionStatus; import net.kaaass.rumbase.transaction.exception.DeadlockException; +import net.kaaass.rumbase.transaction.exception.StatusException; import java.util.Optional; @@ -58,6 +59,8 @@ public Optional queryOptional(TransactionContext txContext, long recordI txContext.sharedLock(recordId, this.identifiedName); } catch (DeadlockException e) { throw new NeedRollbackException(2, e); + } catch (StatusException e) { + throw new RecordNotFoundException(3, e); } } // 读取数据 @@ -93,6 +96,8 @@ public void delete(TransactionContext txContext, long recordId) throws RecordNot txContext.exclusiveLock(recordId, this.identifiedName); } catch (DeadlockException e) { throw new NeedRollbackException(2, e); + } catch (StatusException e) { + throw new RecordNotFoundException(3, e); } var xid = txContext.getXid(); if (xid == 0) { diff --git a/src/main/java/net/kaaass/rumbase/record/exception/RecordNotFoundException.java b/src/main/java/net/kaaass/rumbase/record/exception/RecordNotFoundException.java index fd5857b..0c8644b 100644 --- a/src/main/java/net/kaaass/rumbase/record/exception/RecordNotFoundException.java +++ b/src/main/java/net/kaaass/rumbase/record/exception/RecordNotFoundException.java @@ -10,6 +10,7 @@ *

* E5001-1 物理记录不存在 * E5001-2 由于事务性,记录不可见 + * E5001-3 事务未处于活动状态,不可访问记录 * * @author kaaass */ @@ -18,6 +19,7 @@ public class RecordNotFoundException extends RumbaseException { public static final Map REASONS = new HashMap<>() {{ put(1, "物理记录不存在"); put(2, "由于事务隔离或已经被删除,记录不可见"); + put(3, "事务未处于活动状态,不可访问记录"); }}; /** diff --git a/src/main/java/net/kaaass/rumbase/server/Session.java b/src/main/java/net/kaaass/rumbase/server/Session.java index bcb033d..5db2376 100644 --- a/src/main/java/net/kaaass/rumbase/server/Session.java +++ b/src/main/java/net/kaaass/rumbase/server/Session.java @@ -19,6 +19,7 @@ import net.kaaass.rumbase.table.exception.TableExistenceException; import net.kaaass.rumbase.transaction.TransactionContext; import net.kaaass.rumbase.transaction.TransactionIsolation; +import net.kaaass.rumbase.transaction.exception.StatusException; import java.io.*; import java.net.Socket; @@ -150,6 +151,7 @@ public void onClose() { } catch (RumbaseRuntimeException e) { log.warn("退出会话 {} 时提交事务失败", sessionId, e); say(e); + } catch (StatusException ignored) { } } // 删除活跃会话 @@ -383,10 +385,15 @@ private void checkAutoCommitAfter(boolean rollback) { if (autoCommit) { try { assert currentContext != null; - if (rollback) { - currentContext.rollback(); - } else { - currentContext.commit(); + try { + if (rollback) { + currentContext.rollback(); + } else { + currentContext.commit(); + } + } catch (StatusException e) { + log.warn("自动提交事务失败,会话 {} ", sessionId, e); + say(e); } currentContext = null; } finally { @@ -416,7 +423,12 @@ public Boolean visit(CommitStatement statement) { return false; } // 提交事务 - currentContext.commit(); + try { + currentContext.commit(); + } catch (StatusException e) { + say(e); + return false; + } say("成功提交事务" + currentContext.getXid() + "\n"); currentContext = null; return false; @@ -429,7 +441,12 @@ public Boolean visit(RollbackStatement statement) { return false; } // 回滚事务 - currentContext.rollback(); + try { + currentContext.rollback(); + } catch (StatusException e) { + say(e); + return false; + } say("成功回滚事务" + currentContext.getXid() + "\n"); currentContext = null; return false; diff --git a/src/main/java/net/kaaass/rumbase/table/TableManager.java b/src/main/java/net/kaaass/rumbase/table/TableManager.java index ecb925a..5695809 100644 --- a/src/main/java/net/kaaass/rumbase/table/TableManager.java +++ b/src/main/java/net/kaaass/rumbase/table/TableManager.java @@ -11,6 +11,7 @@ import net.kaaass.rumbase.table.exception.TableExistenceException; import net.kaaass.rumbase.table.field.VarcharField; import net.kaaass.rumbase.transaction.TransactionContext; +import net.kaaass.rumbase.transaction.exception.StatusException; import java.io.File; import java.util.*; @@ -55,7 +56,7 @@ public class TableManager { * * @param context 事务context */ - public void commit(TransactionContext context) { + public void commit(TransactionContext context) throws StatusException { context.commit(); } @@ -64,7 +65,7 @@ public void commit(TransactionContext context) { * * @param context 事务context */ - public void abort(TransactionContext context) { + public void abort(TransactionContext context) throws StatusException { context.rollback(); } diff --git a/src/main/java/net/kaaass/rumbase/transaction/TransactionContext.java b/src/main/java/net/kaaass/rumbase/transaction/TransactionContext.java index f0f1692..bd8ec79 100644 --- a/src/main/java/net/kaaass/rumbase/transaction/TransactionContext.java +++ b/src/main/java/net/kaaass/rumbase/transaction/TransactionContext.java @@ -1,6 +1,7 @@ package net.kaaass.rumbase.transaction; import net.kaaass.rumbase.transaction.exception.DeadlockException; +import net.kaaass.rumbase.transaction.exception.StatusException; import java.util.List; @@ -61,17 +62,17 @@ static TransactionContext empty() { /** * 事务开始 */ - void start(); + void start() throws StatusException; /** * 事务提交 */ - void commit(); + void commit() throws StatusException; /** * 事务撤销 */ - void rollback(); + void rollback() throws StatusException; /** * 对记录加共享锁 @@ -80,7 +81,7 @@ static TransactionContext empty() { * @param tableName 表字段 * @throws DeadlockException 发生死锁异常 */ - void sharedLock(long uuid, String tableName) throws DeadlockException; + void sharedLock(long uuid, String tableName) throws DeadlockException, StatusException; /** * 对记录加排他锁 @@ -89,5 +90,5 @@ static TransactionContext empty() { * @param tableName 表字段 * @throws DeadlockException 发生死锁异常 */ - void exclusiveLock(long uuid, String tableName) throws DeadlockException; + void exclusiveLock(long uuid, String tableName) throws DeadlockException, StatusException; } diff --git a/src/main/java/net/kaaass/rumbase/transaction/TransactionContextImpl.java b/src/main/java/net/kaaass/rumbase/transaction/TransactionContextImpl.java index bb803bd..643f127 100644 --- a/src/main/java/net/kaaass/rumbase/transaction/TransactionContextImpl.java +++ b/src/main/java/net/kaaass/rumbase/transaction/TransactionContextImpl.java @@ -3,11 +3,14 @@ import lombok.Getter; import lombok.Setter; import net.kaaass.rumbase.transaction.exception.DeadlockException; +import net.kaaass.rumbase.transaction.exception.StatusException; import net.kaaass.rumbase.transaction.lock.LockTable; import net.kaaass.rumbase.transaction.lock.LockTableImpl; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * 事务上下文的实现 @@ -36,6 +39,10 @@ public class TransactionContextImpl implements TransactionContext { */ @Getter private final List snapshot; + /** + * 状态互斥锁 + */ + private final Lock statusLock = new ReentrantLock(); /** * 事务状态 */ @@ -122,44 +129,68 @@ public TransactionContextImpl(int xid, TransactionIsolation isolation, Transacti * 开始事务 */ @Override - public void start() { - this.status = TransactionStatus.ACTIVE; - if (manager != null) { - manager.changeTransactionStatus(xid, TransactionStatus.ACTIVE); + public void start() throws StatusException { + statusLock.lock(); + try { + if (this.xid != 0 && !this.status.equals(TransactionStatus.PREPARING)) { + throw new StatusException(1); + } + this.status = TransactionStatus.ACTIVE; + if (manager != null) { + manager.changeTransactionStatus(xid, TransactionStatus.ACTIVE); + } + } finally { + statusLock.unlock(); } - } /** * 提交事务 */ @Override - public void commit() { - // 修改状态 - this.status = TransactionStatus.COMMITTED; - if (manager != null) { - manager.changeTransactionStatus(xid, TransactionStatus.COMMITTED); + public void commit() throws StatusException { + statusLock.lock(); + try { + if (this.xid != 0 && !this.status.equals(TransactionStatus.ACTIVE)) { + throw new StatusException(1); + } + // 修改状态 + this.status = TransactionStatus.COMMITTED; + if (manager != null) { + manager.changeTransactionStatus(xid, TransactionStatus.COMMITTED); + } + + // 释放锁 + LockTable lockTable = LockTableImpl.getInstance(); + lockTable.release(xid); + } finally { + statusLock.unlock(); } - - // 释放锁 - LockTable lockTable = LockTableImpl.getInstance(); - lockTable.release(xid); } /** * 中止事务 */ @Override - public void rollback() { - // 修改状态 - this.status = TransactionStatus.ABORTED; - if (manager != null) { - manager.changeTransactionStatus(xid, TransactionStatus.ABORTED); + public void rollback() throws StatusException { + statusLock.lock(); + try { + if (this.xid != 0 && !this.status.equals(TransactionStatus.ACTIVE)) { + throw new StatusException(1); + } + + // 修改状态 + this.status = TransactionStatus.ABORTED; + if (manager != null) { + manager.changeTransactionStatus(xid, TransactionStatus.ABORTED); + } + + // 释放锁 + LockTable lockTable = LockTableImpl.getInstance(); + lockTable.release(xid); + } finally { + statusLock.unlock(); } - - // 释放锁 - LockTable lockTable = LockTableImpl.getInstance(); - lockTable.release(xid); } /** @@ -169,10 +200,19 @@ public void rollback() { * @param tableName 表字段 */ @Override - public void sharedLock(long uuid, String tableName) throws DeadlockException { - //TODO 加锁 - LockTable lockTable = LockTableImpl.getInstance(); - lockTable.addSharedLock(xid, uuid, tableName); + public void sharedLock(long uuid, String tableName) throws DeadlockException, StatusException { + statusLock.lock(); + try { + if (this.xid != 0 && !this.status.equals(TransactionStatus.ACTIVE)) { + throw new StatusException(1); + } + + LockTable lockTable = LockTableImpl.getInstance(); + lockTable.addSharedLock(xid, uuid, tableName); + } finally { + statusLock.unlock(); + } + } /** @@ -182,10 +222,17 @@ public void sharedLock(long uuid, String tableName) throws DeadlockException { * @param tableName 表字段 */ @Override - public void exclusiveLock(long uuid, String tableName) throws DeadlockException { - //TODO 加锁 - LockTable lockTable = LockTableImpl.getInstance(); - lockTable.addExclusiveLock(xid, uuid, tableName); + public void exclusiveLock(long uuid, String tableName) throws DeadlockException, StatusException { + statusLock.lock(); + try { + if (this.xid != 0 && !this.status.equals(TransactionStatus.ACTIVE)) { + throw new StatusException(1); + } + LockTable lockTable = LockTableImpl.getInstance(); + lockTable.addExclusiveLock(xid, uuid, tableName); + } finally { + statusLock.unlock(); + } } } diff --git a/src/main/java/net/kaaass/rumbase/transaction/exception/StatusException.java b/src/main/java/net/kaaass/rumbase/transaction/exception/StatusException.java new file mode 100644 index 0000000..6801f39 --- /dev/null +++ b/src/main/java/net/kaaass/rumbase/transaction/exception/StatusException.java @@ -0,0 +1,31 @@ +package net.kaaass.rumbase.transaction.exception; + + +import net.kaaass.rumbase.exception.RumbaseException; + +import java.util.HashMap; +import java.util.Map; + +/** + * E6002 事务状态异常 + *

+ * E6002-1 事务状态异常 + * + * @author criki + */ +public class StatusException extends RumbaseException { + + public static final Map REASONS = new HashMap<>() {{ + put(1, "事务状态异常"); + }}; + + /** + * 事务状态异常 + * + * @param subId 子错误号 + */ + public StatusException(int subId) { + super(6001, subId, REASONS.get(subId)); + } + +} diff --git a/src/main/java/net/kaaass/rumbase/transaction/lock/Graph.java b/src/main/java/net/kaaass/rumbase/transaction/lock/Graph.java index 1301612..43d1075 100644 --- a/src/main/java/net/kaaass/rumbase/transaction/lock/Graph.java +++ b/src/main/java/net/kaaass/rumbase/transaction/lock/Graph.java @@ -94,4 +94,11 @@ private boolean dfs(int u) { visited.put(u, 1); return false; } + + @Override + public String toString() { + return "Graph{" + + "waitGraph=" + waitGraph + + '}'; + } } diff --git a/src/main/java/net/kaaass/rumbase/transaction/lock/LockTableImpl.java b/src/main/java/net/kaaass/rumbase/transaction/lock/LockTableImpl.java index 7b617cf..3acac7a 100644 --- a/src/main/java/net/kaaass/rumbase/transaction/lock/LockTableImpl.java +++ b/src/main/java/net/kaaass/rumbase/transaction/lock/LockTableImpl.java @@ -75,23 +75,29 @@ private void addLockTemplate(int xid, DataItemId id, LockMode mode) throws Deadl // 判断是否能加锁 boolean canGrant = list.canGrant(mode); log.info("{} can grant {} lock : {}", xid, mode, canGrant); - // 虚加锁 - list.weakInsert(xid, id, mode, canGrant); - // 检测死锁 - if (deadlockCheck()) { - log.info("deadlock"); - list.pop(); - throw new DeadlockException(1); + // TODO 并发度差,最好改成读写锁 + synchronized (this) { + // 虚加锁 + list.weakInsert(xid, id, mode, canGrant); + // 检测死锁 + if (deadlockCheck()) { + log.info("deadlock"); + list.pop(); + throw new DeadlockException(1); + } } // 可以加锁 // 对于互斥锁,如果发生等待,在等待处即已释放,此处无需释放 canUnlock = canGrant; - // 移除虚锁 - list.pop(); - // 正式加锁 - list.insert(xid, id, mode, canGrant); + // TODO 并发度差,最好改成读写锁 + synchronized (LockTableImpl.class) { + // 移除虚锁 + list.pop(); + // 正式加锁 + list.insert(xid, id, mode, canGrant); + } } finally { if (canUnlock) { list.mutexLock.unlock(); @@ -133,21 +139,26 @@ public void addExclusiveLock(int xid, long uuid, String tableName) throws Deadlo */ @Override public void release(int xid) { - Set dataItemSet = new HashSet<>(); - List sharedLocks = TxList.sharedLocks.get(xid); - List exclusiveLocks = TxList.exclusiveLocks.get(xid); - log.info("{}'s sharedLocks: {}", xid, sharedLocks); - log.info("{}'s exclusiveLocks: {}", xid, exclusiveLocks); - if (sharedLocks != null) { - dataItemSet.addAll(sharedLocks); - } - if (exclusiveLocks != null) { + lock.lock(); + try { + Set dataItemSet = new HashSet<>(); + List sharedLocks = TxList.sharedLocks.get(xid); + List exclusiveLocks = TxList.exclusiveLocks.get(xid); + log.info("{}'s sharedLocks: {}", xid, sharedLocks); + log.info("{}'s exclusiveLocks: {}", xid, exclusiveLocks); + if (sharedLocks != null) { + dataItemSet.addAll(sharedLocks); + } + if (exclusiveLocks != null) { - dataItemSet.addAll(exclusiveLocks); - } + dataItemSet.addAll(exclusiveLocks); + } - for (DataItemId id : dataItemSet) { - release(xid, id); + for (DataItemId id : dataItemSet) { + release(xid, id); + } + } finally { + lock.unlock(); } } @@ -241,8 +252,11 @@ private boolean deadlockCheck() { // 建图 // 遍历每一个等待队列 - for (TxList list : lockTable.values()) { + var lockTableView = Collections.unmodifiableMap(lockTable); + log.debug("Lock table: {}", lockTableView); + for (TxList list : lockTableView.values()) { List waitingTxs = new ArrayList<>(list.locks); + log.debug("locks: {}", list.locks); // 对等待队列中建立等待关系 for (int i = 0; i < waitingTxs.size() - 1; i++) { TxItem frontItem = waitingTxs.get(i); @@ -256,7 +270,7 @@ private boolean deadlockCheck() { continue; } - log.info("[CREATING GRAPH] add edge : {} -> {}", backItem.xid, frontItem.xid); + log.debug("[CREATING GRAPH] add edge : {} -> {}", backItem.xid, frontItem.xid); // backItem等待frontItem graph.addEdge(backItem.xid, frontItem.xid); @@ -267,13 +281,13 @@ private boolean deadlockCheck() { // 邻近的后面的锁等待该锁 TxItem backItem = waitingTxs.get(i + 1); // backItem等待frontItem - log.info("[CREATING GRAPH] add edge : {} -> {}", backItem.xid, frontItem.xid); + log.debug("[CREATING GRAPH] add edge : {} -> {}", backItem.xid, frontItem.xid); graph.addEdge(backItem.xid, frontItem.xid); } } } - log.info("create graph successful!"); + log.debug("create graph successful: {}", graph); // 图成环,则有死锁 return graph.hasLoop(); } diff --git a/src/main/java/net/kaaass/rumbase/transaction/lock/LockTableManager.java b/src/main/java/net/kaaass/rumbase/transaction/lock/LockTableManager.java index 77bd2b2..6b37c95 100644 --- a/src/main/java/net/kaaass/rumbase/transaction/lock/LockTableManager.java +++ b/src/main/java/net/kaaass/rumbase/transaction/lock/LockTableManager.java @@ -12,6 +12,7 @@ * * @author criki */ +@Deprecated public class LockTableManager { /** * 表名与锁表的映射 diff --git a/src/main/java/net/kaaass/rumbase/transaction/lock/TxItem.java b/src/main/java/net/kaaass/rumbase/transaction/lock/TxItem.java index 9bafce6..2db6513 100644 --- a/src/main/java/net/kaaass/rumbase/transaction/lock/TxItem.java +++ b/src/main/java/net/kaaass/rumbase/transaction/lock/TxItem.java @@ -93,4 +93,13 @@ public void abort() { lock.unlock(); } } + + @Override + public String toString() { + return "TxItem{" + + "granted=" + granted + + ", xid=" + xid + + ", mode=" + mode + + '}'; + } } diff --git a/src/main/java/net/kaaass/rumbase/transaction/lock/TxList.java b/src/main/java/net/kaaass/rumbase/transaction/lock/TxList.java index 2ad3386..a264c62 100644 --- a/src/main/java/net/kaaass/rumbase/transaction/lock/TxList.java +++ b/src/main/java/net/kaaass/rumbase/transaction/lock/TxList.java @@ -131,4 +131,11 @@ public void weakInsert(int xid, DataItemId id, LockMode mode, boolean granted) { // 加入等待队列 locks.add(item); } + + @Override + public String toString() { + return "TxList{" + + "locks=" + locks + + '}'; + } } diff --git a/src/test/java/net/kaaass/rumbase/record/MvccReadCommitTest.java b/src/test/java/net/kaaass/rumbase/record/MvccReadCommitTest.java index e12d1bb..772c9e8 100644 --- a/src/test/java/net/kaaass/rumbase/record/MvccReadCommitTest.java +++ b/src/test/java/net/kaaass/rumbase/record/MvccReadCommitTest.java @@ -8,6 +8,7 @@ import net.kaaass.rumbase.transaction.TransactionIsolation; import net.kaaass.rumbase.transaction.TransactionManager; import net.kaaass.rumbase.transaction.TransactionManagerImpl; +import net.kaaass.rumbase.transaction.exception.StatusException; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -50,7 +51,7 @@ public void testReadSelf() throws RecordNotFoundException { } @Test - public void testReadOther() throws RecordNotFoundException { + public void testReadOther() throws RecordNotFoundException, StatusException { var storage = RecordManager.fromFile(PATH + "testReadOther"); var manager = new FakeTxManager(TransactionIsolation.READ_COMMITTED); // 创建事务12 @@ -78,7 +79,7 @@ public void testReadOther() throws RecordNotFoundException { } @Test - public void testDelete() throws RecordNotFoundException { + public void testDelete() throws RecordNotFoundException, StatusException { var storage = RecordManager.fromFile(PATH + "testDelete"); var manager = new FakeTxManager(TransactionIsolation.READ_COMMITTED); // 创建事务1、记录a1a2 @@ -104,7 +105,7 @@ public void testDelete() throws RecordNotFoundException { } @Test - public void testReadSelfReal() throws RecordNotFoundException, IOException, FileException { + public void testReadSelfReal() throws RecordNotFoundException, IOException, FileException, StatusException { var storage = RecordManager.fromFile(PATH + "testReadSelfReal"); var manager = new TransactionManagerImpl(); // 创建事务1 @@ -128,7 +129,7 @@ public void testReadSelfReal() throws RecordNotFoundException, IOException, File } @Test - public void testReadOtherReal() throws RecordNotFoundException, IOException, FileException { + public void testReadOtherReal() throws RecordNotFoundException, IOException, FileException, StatusException { var storage = RecordManager.fromFile(PATH + "testReadOtherReal"); var manager = new TransactionManagerImpl(); // 创建事务12 @@ -160,7 +161,7 @@ public void testReadOtherReal() throws RecordNotFoundException, IOException, Fil } @Test - public void testDeleteReal() throws RecordNotFoundException, IOException, FileException { + public void testDeleteReal() throws RecordNotFoundException, IOException, FileException, StatusException { var storage = RecordManager.fromFile(PATH + "testDeleteReal"); var manager = new TransactionManagerImpl(); // 创建事务1、记录a1a2 diff --git a/src/test/java/net/kaaass/rumbase/record/MvccReadRepeatableTest.java b/src/test/java/net/kaaass/rumbase/record/MvccReadRepeatableTest.java index 3f35268..91d1e2e 100644 --- a/src/test/java/net/kaaass/rumbase/record/MvccReadRepeatableTest.java +++ b/src/test/java/net/kaaass/rumbase/record/MvccReadRepeatableTest.java @@ -10,6 +10,7 @@ import net.kaaass.rumbase.transaction.TransactionIsolation; import net.kaaass.rumbase.transaction.TransactionManager; import net.kaaass.rumbase.transaction.TransactionManagerImpl; +import net.kaaass.rumbase.transaction.exception.StatusException; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -52,7 +53,7 @@ public void testReadSelf() throws RecordNotFoundException { } @Test - public void testReadOther() throws RecordNotFoundException { + public void testReadOther() throws RecordNotFoundException, StatusException { var storage = RecordManager.fromFile(PATH + "testReadOther"); var manager = new FakeTxManager(TransactionIsolation.REPEATABLE_READ); // 创建事务12 @@ -80,7 +81,7 @@ public void testReadOther() throws RecordNotFoundException { } @Test - public void testDelete() throws RecordNotFoundException { + public void testDelete() throws RecordNotFoundException, StatusException { var storage = RecordManager.fromFile(PATH + "testDelete"); var manager = new FakeTxManager(TransactionIsolation.REPEATABLE_READ); // 创建事务1、记录a1a2 @@ -106,7 +107,7 @@ public void testDelete() throws RecordNotFoundException { } @Test - public void testVersionSkip() throws RecordNotFoundException, NeedRollbackException { + public void testVersionSkip() throws RecordNotFoundException, NeedRollbackException, StatusException { var storage = RecordManager.fromFile(PATH + "testDelete"); var manager = new FakeTxManager(TransactionIsolation.REPEATABLE_READ); // 创建公共版本 @@ -128,7 +129,7 @@ public void testVersionSkip() throws RecordNotFoundException, NeedRollbackExcept } @Test - public void testReadSelfReal() throws RecordNotFoundException, IOException, FileException { + public void testReadSelfReal() throws RecordNotFoundException, IOException, FileException, StatusException { var storage = RecordManager.fromFile(PATH + "testReadSelfReal"); var manager = new TransactionManagerImpl(); // 创建事务1 @@ -152,7 +153,7 @@ public void testReadSelfReal() throws RecordNotFoundException, IOException, File } @Test - public void testReadOtherReal() throws RecordNotFoundException, IOException, FileException { + public void testReadOtherReal() throws RecordNotFoundException, IOException, FileException, StatusException { var storage = RecordManager.fromFile(PATH + "testReadOtherReal"); var manager = new TransactionManagerImpl(); // 创建事务12 @@ -185,7 +186,7 @@ public void testReadOtherReal() throws RecordNotFoundException, IOException, Fil } @Test - public void testDeleteReal() throws RecordNotFoundException, IOException, FileException { + public void testDeleteReal() throws RecordNotFoundException, IOException, FileException, StatusException { var storage = RecordManager.fromFile(PATH + "testDeleteReal"); var manager = new TransactionManagerImpl(); // 创建事务1、记录a1a2 @@ -216,7 +217,7 @@ public void testDeleteReal() throws RecordNotFoundException, IOException, FileEx } @Test - public void testVersionSkipReal() throws RecordNotFoundException, NeedRollbackException, IOException, FileException { + public void testVersionSkipReal() throws RecordNotFoundException, NeedRollbackException, IOException, FileException, StatusException { var storage = RecordManager.fromFile(PATH + "testDeleteReal"); var manager = new TransactionManagerImpl(); // 创建公共版本 diff --git a/src/test/java/net/kaaass/rumbase/transaction/TransactionContextTest.java b/src/test/java/net/kaaass/rumbase/transaction/TransactionContextTest.java index fa4fa23..189b90d 100644 --- a/src/test/java/net/kaaass/rumbase/transaction/TransactionContextTest.java +++ b/src/test/java/net/kaaass/rumbase/transaction/TransactionContextTest.java @@ -4,15 +4,15 @@ import net.kaaass.rumbase.FileUtil; import net.kaaass.rumbase.page.exception.FileException; import net.kaaass.rumbase.transaction.exception.DeadlockException; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; +import net.kaaass.rumbase.transaction.exception.StatusException; +import org.junit.*; import java.io.File; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; +import static junit.framework.TestCase.assertTrue; + /** * 测试事务上下文 * @@ -57,7 +57,7 @@ public void testCreateTransaction() throws IOException, FileException { * 测试事务变化 */ @Test - public void testChangeStatus() throws IOException, FileException { + public void testChangeStatus() throws IOException, FileException, StatusException { var manager = new TransactionManagerImpl("test_gen_files/test_change.log"); var committedTransaction = manager.createTransactionContext(TransactionIsolation.READ_COMMITTED); // 事务初始状态 @@ -82,7 +82,7 @@ public void testChangeStatus() throws IOException, FileException { * 测试事务持久化 */ @Test - public void testTransactionPersistence() throws IOException, FileException { + public void testTransactionPersistence() throws IOException, FileException, StatusException { var manager = new TransactionManagerImpl("test_gen_files/test_persistence.log"); // 事务创建,事务状态记录数改变 var transaction1 = manager.createTransactionContext(TransactionIsolation.READ_UNCOMMITTED); @@ -115,7 +115,7 @@ public void testTransactionPersistence() throws IOException, FileException { * 测试事务状态复原 */ @Test - public void testTransactionRecovery() throws IOException, FileException { + public void testTransactionRecovery() throws IOException, FileException, StatusException { var manager = new TransactionManagerImpl("test_gen_files/test_recovery.log"); // 事务创建,事务状态记录数改变 var transaction1 = manager.createTransactionContext(TransactionIsolation.READ_UNCOMMITTED); @@ -136,12 +136,14 @@ public void testTransactionRecovery() throws IOException, FileException { * 测试事务上锁 */ @Test - public void testAddLock() throws IOException, FileException { + public void testAddLock() throws IOException, FileException, StatusException { var manager = new TransactionManagerImpl("test_gen_files/test_add_lock.log"); var transaction1 = manager.createTransactionContext(TransactionIsolation.READ_UNCOMMITTED); var transaction2 = manager.createTransactionContext(TransactionIsolation.READ_UNCOMMITTED); String tableName = "test"; + transaction1.start(); + transaction2.start(); // 互斥锁 new Thread(() -> { @@ -151,7 +153,11 @@ public void testAddLock() throws IOException, FileException { e.printStackTrace(); } log.info("transaction2 commit"); - transaction2.commit(); + try { + transaction2.commit(); + } catch (StatusException e) { + e.printStackTrace(); + } }).start(); try { transaction1.exclusiveLock(1, tableName); @@ -171,7 +177,7 @@ public void testAddLock() throws IOException, FileException { transaction1.commit(); transaction2.rollback(); - } catch (DeadlockException e) { + } catch (DeadlockException | StatusException e) { e.printStackTrace(); } } @@ -180,37 +186,155 @@ public void testAddLock() throws IOException, FileException { * 测试死锁 */ @Test - public void testDeadlock() throws IOException, FileException, InterruptedException { + public void testDeadlock() throws IOException, FileException, InterruptedException, StatusException { var manager = new TransactionManagerImpl("test_gen_files/test_deadlock.log"); - var transaction1 = manager.createTransactionContext(TransactionIsolation.READ_UNCOMMITTED); - var transaction2 = manager.createTransactionContext(TransactionIsolation.READ_UNCOMMITTED); - String tableName = "test"; + for (int i = 0; i < 5; i++) { + log.info("============= Test times {} =============", i); + var transaction1 = manager.createTransactionContext(TransactionIsolation.READ_UNCOMMITTED); + var transaction2 = manager.createTransactionContext(TransactionIsolation.READ_UNCOMMITTED); + String tableName = "test"; - AtomicBoolean deadlockDetect = new AtomicBoolean(false); - new Thread(() -> { + transaction1.start(); + transaction2.start(); + + AtomicBoolean syncPoint = new AtomicBoolean(false); + AtomicBoolean deadlockDetect = new AtomicBoolean(false); + Thread thread = new Thread(() -> { + try { + while (!syncPoint.get()) { + Thread.sleep(10); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + try { + transaction2.exclusiveLock(1, tableName); + } catch (DeadlockException e) { + deadlockDetect.set(true); + e.printStackTrace(); + try { + transaction2.rollback(); + } catch (StatusException statusException) { + statusException.printStackTrace(); + } + } catch (StatusException e) { + e.printStackTrace(); + } + }); + thread.start(); try { - Thread.sleep(3); - } catch (InterruptedException e) { + transaction1.exclusiveLock(1, tableName); + transaction2.exclusiveLock(2, tableName); + syncPoint.set(true); + transaction1.exclusiveLock(2, tableName); + } catch (DeadlockException e) { + deadlockDetect.set(true); e.printStackTrace(); + log.info("rollback tx2"); + transaction2.rollback(); } + thread.join(); + assertTrue("Deadlock should be detected", deadlockDetect.get()); + transaction1.commit(); + log.info("tx1 committed"); + } + } + + @Ignore + public void testDeadlock3() throws IOException, FileException, InterruptedException, StatusException { + // FIXME 三线程死锁问题仍不能解决 + var manager = new TransactionManagerImpl("test_gen_files/test_deadlock3.log"); + for (int i = 0; i < 1; i++) { + log.info("============= Test times {} =============", i); + var tx1 = manager.createTransactionContext(TransactionIsolation.READ_UNCOMMITTED); + var tx2 = manager.createTransactionContext(TransactionIsolation.READ_UNCOMMITTED); + var tx3 = manager.createTransactionContext(TransactionIsolation.READ_UNCOMMITTED); + String tableName = "test"; + + tx1.start(); + tx2.start(); + tx3.start(); + + AtomicBoolean syncPoint = new AtomicBoolean(false); + AtomicBoolean deadlockDetect = new AtomicBoolean(false); + var thread2 = new Thread(() -> { + try { + while (!syncPoint.get()) { + Thread.sleep(3); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + try { + tx2.exclusiveLock(3, tableName); + } catch (DeadlockException e) { + deadlockDetect.set(true); + e.printStackTrace(); + try { + log.info("rollback tx2"); + tx2.rollback(); + } catch (StatusException statusException) { + statusException.printStackTrace(); + } + } catch (StatusException e) { + e.printStackTrace(); + } + log.info("thread2 alive"); + }); + var thread3 = new Thread(() -> { + try { + while (!syncPoint.get()) { + Thread.sleep(3); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + try { + tx3.exclusiveLock(1, tableName); + } catch (DeadlockException e) { + deadlockDetect.set(true); + e.printStackTrace(); + try { + log.info("rollback tx3"); + tx3.rollback(); + } catch (StatusException statusException) { + statusException.printStackTrace(); + } + } catch (StatusException e) { + e.printStackTrace(); + } + log.info("thread3 alive"); + }); + thread2.start(); + thread3.start(); try { - transaction2.exclusiveLock(1, tableName); + tx1.exclusiveLock(1, tableName); + tx2.exclusiveLock(2, tableName); + tx3.exclusiveLock(3, tableName); + syncPoint.set(true); + tx1.exclusiveLock(2, tableName); } catch (DeadlockException e) { deadlockDetect.set(true); - transaction2.rollback(); e.printStackTrace(); + log.info("rollback tx1"); + tx1.rollback(); + } + log.info("thread1 alive"); + thread2.join(); + thread3.join(); + assertTrue("Deadlock should be detected", deadlockDetect.get()); + if (tx1.getStatus() == TransactionStatus.ACTIVE) { + tx1.commit(); + log.info("tx1 committed"); + } + if (tx2.getStatus() == TransactionStatus.ACTIVE) { + tx2.commit(); + log.info("tx2 committed"); + } + if (tx3.getStatus() == TransactionStatus.ACTIVE) { + tx3.commit(); + log.info("tx3 committed"); } - }).start(); - try { - transaction1.exclusiveLock(1, tableName); - transaction2.exclusiveLock(2, tableName); - transaction1.exclusiveLock(2, tableName); - } catch (DeadlockException e) { - deadlockDetect.set(true); - transaction2.rollback(); - e.printStackTrace(); } - Thread.sleep(10); - Assert.assertTrue("Deadlock should be detected", deadlockDetect.get()); } }