Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
testFile
test_gen_files
xid.log
metadata.db

# IntelliJ IDEA
.idea
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import net.kaaass.rumbase.transaction.TransactionIsolation;
import net.kaaass.rumbase.transaction.TransactionStatus;
import net.kaaass.rumbase.transaction.exception.DeadlockException;
import net.kaaass.rumbase.transaction.exception.StatusException;

import java.util.Optional;

Expand Down Expand Up @@ -58,6 +59,8 @@ public Optional<byte[]> queryOptional(TransactionContext txContext, long recordI
txContext.sharedLock(recordId, this.identifiedName);
} catch (DeadlockException e) {
throw new NeedRollbackException(2, e);
} catch (StatusException e) {
throw new RecordNotFoundException(3, e);
}
}
// 读取数据
Expand Down Expand Up @@ -93,6 +96,8 @@ public void delete(TransactionContext txContext, long recordId) throws RecordNot
txContext.exclusiveLock(recordId, this.identifiedName);
} catch (DeadlockException e) {
throw new NeedRollbackException(2, e);
} catch (StatusException e) {
throw new RecordNotFoundException(3, e);
}
var xid = txContext.getXid();
if (xid == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* <p>
* E5001-1 物理记录不存在
* E5001-2 由于事务性,记录不可见
* E5001-3 事务未处于活动状态,不可访问记录
*
* @author kaaass
*/
Expand All @@ -18,6 +19,7 @@ public class RecordNotFoundException extends RumbaseException {
public static final Map<Integer, String> REASONS = new HashMap<>() {{
put(1, "物理记录不存在");
put(2, "由于事务隔离或已经被删除,记录不可见");
put(3, "事务未处于活动状态,不可访问记录");
}};

/**
Expand Down
29 changes: 23 additions & 6 deletions src/main/java/net/kaaass/rumbase/server/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import net.kaaass.rumbase.table.exception.TableExistenceException;
import net.kaaass.rumbase.transaction.TransactionContext;
import net.kaaass.rumbase.transaction.TransactionIsolation;
import net.kaaass.rumbase.transaction.exception.StatusException;

import java.io.*;
import java.net.Socket;
Expand Down Expand Up @@ -150,6 +151,7 @@ public void onClose() {
} catch (RumbaseRuntimeException e) {
log.warn("退出会话 {} 时提交事务失败", sessionId, e);
say(e);
} catch (StatusException ignored) {
}
}
// 删除活跃会话
Expand Down Expand Up @@ -383,10 +385,15 @@ private void checkAutoCommitAfter(boolean rollback) {
if (autoCommit) {
try {
assert currentContext != null;
if (rollback) {
currentContext.rollback();
} else {
currentContext.commit();
try {
if (rollback) {
currentContext.rollback();
} else {
currentContext.commit();
}
} catch (StatusException e) {
log.warn("自动提交事务失败,会话 {} ", sessionId, e);
say(e);
}
currentContext = null;
} finally {
Expand Down Expand Up @@ -416,7 +423,12 @@ public Boolean visit(CommitStatement statement) {
return false;
}
// 提交事务
currentContext.commit();
try {
currentContext.commit();
} catch (StatusException e) {
say(e);
return false;
}
say("成功提交事务" + currentContext.getXid() + "\n");
currentContext = null;
return false;
Expand All @@ -429,7 +441,12 @@ public Boolean visit(RollbackStatement statement) {
return false;
}
// 回滚事务
currentContext.rollback();
try {
currentContext.rollback();
} catch (StatusException e) {
say(e);
return false;
}
say("成功回滚事务" + currentContext.getXid() + "\n");
currentContext = null;
return false;
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/net/kaaass/rumbase/table/TableManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import net.kaaass.rumbase.table.exception.TableExistenceException;
import net.kaaass.rumbase.table.field.VarcharField;
import net.kaaass.rumbase.transaction.TransactionContext;
import net.kaaass.rumbase.transaction.exception.StatusException;

import java.io.File;
import java.util.*;
Expand Down Expand Up @@ -55,7 +56,7 @@ public class TableManager {
*
* @param context 事务context
*/
public void commit(TransactionContext context) {
public void commit(TransactionContext context) throws StatusException {
context.commit();
}

Expand All @@ -64,7 +65,7 @@ public void commit(TransactionContext context) {
*
* @param context 事务context
*/
public void abort(TransactionContext context) {
public void abort(TransactionContext context) throws StatusException {
context.rollback();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package net.kaaass.rumbase.transaction;

import net.kaaass.rumbase.transaction.exception.DeadlockException;
import net.kaaass.rumbase.transaction.exception.StatusException;

import java.util.List;

Expand Down Expand Up @@ -61,17 +62,17 @@ static TransactionContext empty() {
/**
* 事务开始
*/
void start();
void start() throws StatusException;

/**
* 事务提交
*/
void commit();
void commit() throws StatusException;

/**
* 事务撤销
*/
void rollback();
void rollback() throws StatusException;

/**
* 对记录加共享锁
Expand All @@ -80,7 +81,7 @@ static TransactionContext empty() {
* @param tableName 表字段
* @throws DeadlockException 发生死锁异常
*/
void sharedLock(long uuid, String tableName) throws DeadlockException;
void sharedLock(long uuid, String tableName) throws DeadlockException, StatusException;

/**
* 对记录加排他锁
Expand All @@ -89,5 +90,5 @@ static TransactionContext empty() {
* @param tableName 表字段
* @throws DeadlockException 发生死锁异常
*/
void exclusiveLock(long uuid, String tableName) throws DeadlockException;
void exclusiveLock(long uuid, String tableName) throws DeadlockException, StatusException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
import lombok.Getter;
import lombok.Setter;
import net.kaaass.rumbase.transaction.exception.DeadlockException;
import net.kaaass.rumbase.transaction.exception.StatusException;
import net.kaaass.rumbase.transaction.lock.LockTable;
import net.kaaass.rumbase.transaction.lock.LockTableImpl;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 事务上下文的实现
Expand Down Expand Up @@ -36,6 +39,10 @@ public class TransactionContextImpl implements TransactionContext {
*/
@Getter
private final List<Integer> snapshot;
/**
* 状态互斥锁
*/
private final Lock statusLock = new ReentrantLock();
/**
* 事务状态
*/
Expand Down Expand Up @@ -122,44 +129,68 @@ public TransactionContextImpl(int xid, TransactionIsolation isolation, Transacti
* 开始事务
*/
@Override
public void start() {
this.status = TransactionStatus.ACTIVE;
if (manager != null) {
manager.changeTransactionStatus(xid, TransactionStatus.ACTIVE);
public void start() throws StatusException {
statusLock.lock();
try {
if (this.xid != 0 && !this.status.equals(TransactionStatus.PREPARING)) {
throw new StatusException(1);
}
this.status = TransactionStatus.ACTIVE;
if (manager != null) {
manager.changeTransactionStatus(xid, TransactionStatus.ACTIVE);
}
} finally {
statusLock.unlock();
}

}

/**
* 提交事务
*/
@Override
public void commit() {
// 修改状态
this.status = TransactionStatus.COMMITTED;
if (manager != null) {
manager.changeTransactionStatus(xid, TransactionStatus.COMMITTED);
public void commit() throws StatusException {
statusLock.lock();
try {
if (this.xid != 0 && !this.status.equals(TransactionStatus.ACTIVE)) {
throw new StatusException(1);
}
// 修改状态
this.status = TransactionStatus.COMMITTED;
if (manager != null) {
manager.changeTransactionStatus(xid, TransactionStatus.COMMITTED);
}

// 释放锁
LockTable lockTable = LockTableImpl.getInstance();
lockTable.release(xid);
} finally {
statusLock.unlock();
}

// 释放锁
LockTable lockTable = LockTableImpl.getInstance();
lockTable.release(xid);
}

/**
* 中止事务
*/
@Override
public void rollback() {
// 修改状态
this.status = TransactionStatus.ABORTED;
if (manager != null) {
manager.changeTransactionStatus(xid, TransactionStatus.ABORTED);
public void rollback() throws StatusException {
statusLock.lock();
try {
if (this.xid != 0 && !this.status.equals(TransactionStatus.ACTIVE)) {
throw new StatusException(1);
}

// 修改状态
this.status = TransactionStatus.ABORTED;
if (manager != null) {
manager.changeTransactionStatus(xid, TransactionStatus.ABORTED);
}

// 释放锁
LockTable lockTable = LockTableImpl.getInstance();
lockTable.release(xid);
} finally {
statusLock.unlock();
}

// 释放锁
LockTable lockTable = LockTableImpl.getInstance();
lockTable.release(xid);
}

/**
Expand All @@ -169,10 +200,19 @@ public void rollback() {
* @param tableName 表字段
*/
@Override
public void sharedLock(long uuid, String tableName) throws DeadlockException {
//TODO 加锁
LockTable lockTable = LockTableImpl.getInstance();
lockTable.addSharedLock(xid, uuid, tableName);
public void sharedLock(long uuid, String tableName) throws DeadlockException, StatusException {
statusLock.lock();
try {
if (this.xid != 0 && !this.status.equals(TransactionStatus.ACTIVE)) {
throw new StatusException(1);
}

LockTable lockTable = LockTableImpl.getInstance();
lockTable.addSharedLock(xid, uuid, tableName);
} finally {
statusLock.unlock();
}

}

/**
Expand All @@ -182,10 +222,17 @@ public void sharedLock(long uuid, String tableName) throws DeadlockException {
* @param tableName 表字段
*/
@Override
public void exclusiveLock(long uuid, String tableName) throws DeadlockException {
//TODO 加锁
LockTable lockTable = LockTableImpl.getInstance();
lockTable.addExclusiveLock(xid, uuid, tableName);
public void exclusiveLock(long uuid, String tableName) throws DeadlockException, StatusException {
statusLock.lock();
try {
if (this.xid != 0 && !this.status.equals(TransactionStatus.ACTIVE)) {
throw new StatusException(1);
}
LockTable lockTable = LockTableImpl.getInstance();
lockTable.addExclusiveLock(xid, uuid, tableName);
} finally {
statusLock.unlock();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package net.kaaass.rumbase.transaction.exception;


import net.kaaass.rumbase.exception.RumbaseException;

import java.util.HashMap;
import java.util.Map;

/**
* E6002 事务状态异常
* <p>
* E6002-1 事务状态异常
*
* @author criki
*/
public class StatusException extends RumbaseException {

public static final Map<Integer, String> REASONS = new HashMap<>() {{
put(1, "事务状态异常");
}};

/**
* 事务状态异常
*
* @param subId 子错误号
*/
public StatusException(int subId) {
super(6001, subId, REASONS.get(subId));
}

}
7 changes: 7 additions & 0 deletions src/main/java/net/kaaass/rumbase/transaction/lock/Graph.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,11 @@ private boolean dfs(int u) {
visited.put(u, 1);
return false;
}

@Override
public String toString() {
return "Graph{" +
"waitGraph=" + waitGraph +
'}';
}
}
Loading