diff --git a/src/main/java/net/kaaass/rumbase/dataitem/IItemStorage.java b/src/main/java/net/kaaass/rumbase/dataitem/IItemStorage.java index 70c6b17..bc0c2e3 100644 --- a/src/main/java/net/kaaass/rumbase/dataitem/IItemStorage.java +++ b/src/main/java/net/kaaass/rumbase/dataitem/IItemStorage.java @@ -2,8 +2,13 @@ import net.kaaass.rumbase.dataitem.exception.PageCorruptedException; import net.kaaass.rumbase.dataitem.exception.UUIDException; +import net.kaaass.rumbase.page.exception.FileException; +import net.kaaass.rumbase.page.exception.PageException; +import net.kaaass.rumbase.recovery.IRecoveryStorage; +import net.kaaass.rumbase.recovery.exception.LogException; import net.kaaass.rumbase.transaction.TransactionContext; +import java.io.IOException; import java.util.List; /** @@ -12,13 +17,44 @@ * @author kaito */ public interface IItemStorage { + + void setMetaUuid(long uuid) throws PageException, IOException; + + /** + * 获得日志管理器 + * + * @return + */ + IRecoveryStorage getRecoveryStorage(); + + /** + * 获取表的tempFreePage + * + * @return + */ + public int getMaxPageId(); + + /** + * 将uuid对应的页强制写回 + */ + public void flush(long uuid) throws FileException; + /** * 插入数据项 * - * @param item 数据项 + * @param txContext 事务上下文 + * @param item 数据项 * @return 返回数据项的UUID */ - long insertItem(TransactionContext txContext, byte[] item); + long insertItem(TransactionContext txContext, byte[] item) throws PageCorruptedException; + + /** + * 不用日志进行插入,用于日志的管理 + * + * @param item 数据项 + * @return uuid + */ + long insertItemWithoutLog(byte[] item); /** * 插入一个有UUID的数据项,唯一使用的地方是日志恢复时使用 @@ -30,7 +66,7 @@ public interface IItemStorage { * @param item 数据项 * @param uuid 编号 */ - void insertItemWithUuid(TransactionContext txContext, byte[] item, long uuid); + void insertItemWithUuid(byte[] item, long uuid); /** * 通过UUID查询数据项 @@ -59,6 +95,8 @@ public interface IItemStorage { */ void updateItemByUuid(TransactionContext txContext, long uuid, byte[] item) throws UUIDException, PageCorruptedException; + byte[] updateItemWithoutLog(long uuid, byte[] item) throws UUIDException; + /** * 获得数据项存储的元数据(可以用于头) * @@ -71,7 +109,14 @@ public interface IItemStorage { * * @param metadata 头信息 */ - void setMetadata(TransactionContext txContext, byte[] metadata) throws PageCorruptedException; + long setMetadata(TransactionContext txContext, byte[] metadata); + + /** + * 不使用日志设置元数据 + * + * @param metadata 头信息 + */ + long setMetadataWithoutLog(byte[] metadata); /** * 清理多余的数据项,空间清理时使用。 @@ -79,6 +124,18 @@ public interface IItemStorage { * @param uuids 数据项UUID的编号列表 */ void removeItems(List uuids); + + /** + * 日志恢复时用,回退对应的insert操作,做法是删除对应的uuid + * + * @param uuid + */ + void deleteUuid(long uuid) throws PageException, IOException; + + /** + * 建议存储进行一次回写 + */ + void flush(); } diff --git a/src/main/java/net/kaaass/rumbase/dataitem/ItemManager.java b/src/main/java/net/kaaass/rumbase/dataitem/ItemManager.java index 609fff6..f86143d 100644 --- a/src/main/java/net/kaaass/rumbase/dataitem/ItemManager.java +++ b/src/main/java/net/kaaass/rumbase/dataitem/ItemManager.java @@ -1,11 +1,12 @@ package net.kaaass.rumbase.dataitem; +import net.kaaass.rumbase.dataitem.exception.PageCorruptedException; import net.kaaass.rumbase.page.exception.FileException; import net.kaaass.rumbase.page.exception.PageException; +import net.kaaass.rumbase.recovery.exception.LogException; import net.kaaass.rumbase.transaction.TransactionContext; -import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -25,7 +26,7 @@ public class ItemManager { * @param fileName 文件名 * @return 数据项管理器,用于管理数据项 */ - public static IItemStorage fromFile(String fileName) throws FileException, IOException, PageException { + public static IItemStorage fromFile(String fileName) throws FileException, PageException, LogException { if (maps.containsKey(fileName)) { return maps.get(fileName); } else { @@ -35,6 +36,17 @@ public static IItemStorage fromFile(String fileName) throws FileException, IOExc } } + public static IItemStorage fromFileWithoutLog(String fileName) throws FileException, PageException, PageCorruptedException { + if (maps.containsKey(fileName)) { + return maps.get(fileName); + } else { + IItemStorage iItemStorage = ItemStorage.ofFileWithoutLog(fileName); + maps.put(fileName, iItemStorage); + return iItemStorage; + } + } + + /** * 新建一个数据库,并且将上层提供的头信息写入。 * @@ -44,7 +56,7 @@ public static IItemStorage fromFile(String fileName) throws FileException, IOExc * @return 数据项管理器 * @throws FileException 想新建的文件已经存在的异常 */ - public static IItemStorage createFile(TransactionContext txContext, String fileName, byte[] metadata) throws FileException, IOException, PageException { + public static IItemStorage createFile(TransactionContext txContext, String fileName, byte[] metadata) throws FileException, PageException, LogException { // 如果文件已经存在,那么就抛出文件已存在异常 if (maps.containsKey(fileName)) { throw new FileException(1); @@ -54,6 +66,17 @@ public static IItemStorage createFile(TransactionContext txContext, String fileN maps.put(fileName, iItemStorage); return iItemStorage; } + } + public static IItemStorage createFileWithoutLog(String fileName, byte[] metadata) throws FileException, PageException { + // 如果文件已经存在,那就返回 + if (maps.containsKey(fileName)) { + return maps.get(fileName); + } else { + // 若文件不存在,则创建文件。 + IItemStorage iItemStorage = ItemStorage.ofNewFileWithoutLog(fileName, metadata); + maps.put(fileName, iItemStorage); + return iItemStorage; + } } } diff --git a/src/main/java/net/kaaass/rumbase/dataitem/ItemStorage.java b/src/main/java/net/kaaass/rumbase/dataitem/ItemStorage.java index 7e054bb..016c8d1 100644 --- a/src/main/java/net/kaaass/rumbase/dataitem/ItemStorage.java +++ b/src/main/java/net/kaaass/rumbase/dataitem/ItemStorage.java @@ -12,6 +12,8 @@ import net.kaaass.rumbase.page.exception.FileException; import net.kaaass.rumbase.page.exception.PageException; import net.kaaass.rumbase.recovery.IRecoveryStorage; +import net.kaaass.rumbase.recovery.RecoveryManager; +import net.kaaass.rumbase.recovery.exception.LogException; import net.kaaass.rumbase.transaction.TransactionContext; import java.io.IOException; @@ -60,6 +62,14 @@ public class ItemStorage implements IItemStorage { private IRecoveryStorage recoveryStorage; + public ItemStorage(String fileName, int tempFreePage, long headerUuid, PageStorage pageStorage,IRecoveryStorage iRecoveryStorage) { + this.fileName = fileName; + this.tempFreePage = tempFreePage; + this.headerUuid = headerUuid; + this.pageStorage = pageStorage; + this.recoveryStorage = iRecoveryStorage; + } + public ItemStorage(String fileName, int tempFreePage, long headerUuid, PageStorage pageStorage) { this.fileName = fileName; this.tempFreePage = tempFreePage; @@ -77,7 +87,7 @@ public ItemStorage(String fileName, int tempFreePage, long headerUuid, PageStora * @param header 第一页的Page对象 * @return 是否是表的第一页 */ - private static boolean checkTableHeader(Page header) { + private static boolean checkTableHeader(Page header) throws PageCorruptedException{ var data = header.getData(); byte[] flag = new byte[4]; try { @@ -94,14 +104,48 @@ private static boolean checkTableHeader(Page header) { * @param fileName 文件名 * @return 解析或新建得到的数据项管理器对象 */ - public static IItemStorage ofFile(String fileName) throws FileException, PageException { + public static IItemStorage ofFile(String fileName) throws FileException, PageException, LogException { + var pageStorage = PageManager.fromFile(fileName); + var header = pageStorage.get(0); + header.pin(); + try { + if (checkTableHeader(header)) { + // 如果表头标志存在,就解析对应表头信息 + var h = parseTableHeader(header); + var logStorage = RecoveryManager.getRecoveryStorage(fileName); + return new ItemStorage(fileName, h.tempFreePage, h.headerUuid, pageStorage,logStorage); + } else { + // 若表头标志不存在,就初始化对应的表信息。 + // 只初始化headerFlag和tempFreePage,表头信息位置统一由setMetadata来实现 + byte[] bytes; + try { + bytes = JBBPOut.BeginBin(). + Byte(1, 2, 3, 4). + Int(1). + End().toByteArray(); + } catch (IOException e) { + throw new PageCorruptedException(1, e); + } + header.patchData(0, bytes); + var logStorage = RecoveryManager.createRecoveryStorage(fileName); + return new ItemStorage(fileName, 1, 0, pageStorage,logStorage); + } + } finally { + header.unpin(); + } + } + + /** + * 不使用日志打开文件 + */ + public static IItemStorage ofFileWithoutLog(String fileName) throws FileException, PageException, PageCorruptedException { var pageStorage = PageManager.fromFile(fileName); var header = pageStorage.get(0); header.pin(); try { if (checkTableHeader(header)) { // 如果表头标志存在,就解析对应表头信息 - var h = parseHeader(header); + var h = parseTableHeader(header); return new ItemStorage(fileName, h.tempFreePage, h.headerUuid, pageStorage); } else { // 若表头标志不存在,就初始化对应的表信息。 @@ -113,6 +157,7 @@ public static IItemStorage ofFile(String fileName) throws FileException, PageExc Int(1). End().toByteArray(); } catch (IOException e) { + header.unpin(); throw new PageCorruptedException(1, e); } header.patchData(0, bytes); @@ -130,12 +175,21 @@ public static IItemStorage ofFile(String fileName) throws FileException, PageExc * @param metadata 表头信息 * @return 数据项管理器 */ - public static IItemStorage ofNewFile(TransactionContext txContext, String fileName, byte[] metadata) throws IOException, FileException, PageException { + public static IItemStorage ofNewFile(TransactionContext txContext, String fileName, byte[] metadata) throws FileException, PageException, LogException { var pageStorage = ItemStorage.ofFile(fileName); pageStorage.setMetadata(txContext, metadata); return pageStorage; } + /** + * 不使用日志的创建文件 + */ + public static IItemStorage ofNewFileWithoutLog(String fileName, byte[] metadata) throws FileException, PageException { + var pageStorage = ItemStorage.ofFileWithoutLog(fileName); + pageStorage.setMetadataWithoutLog(metadata); + return pageStorage; + } + /** * 根据uuid获取后面的随机位置 */ @@ -176,7 +230,7 @@ private void releasePage(Page page) { * * @return 解析得到的表头对象 */ - private static TableHeader parseHeader(Page page) throws PageCorruptedException { + private static TableHeader parseTableHeader(Page page) throws PageCorruptedException { try { return JBBPParser.prepare("int headerFlag;int tempFreePage;byte hasHeaderInfo;long headerUuid;"). parse(page.getData()).mapTo(new TableHeader()); @@ -213,7 +267,7 @@ private static byte[] parseData(Page page, Item item) throws PageCorruptedExcept * @param page 页 * @return 该页是否已经被初始化 */ - private boolean checkPageHeader(Page page) { + private boolean checkPageHeader(Page page) throws PageCorruptedException{ byte[] pageFlag = new byte[4]; try { var n = page.getData().read(pageFlag); @@ -240,7 +294,7 @@ private Optional getPageHeader(Page page) { return Optional.empty(); } - private PageHeader initPage(Page page) { + private PageHeader initPage(Page page) throws PageCorruptedException { final byte[] bytes; try { bytes = JBBPOut.BeginBin(). @@ -259,7 +313,7 @@ private PageHeader initPage(Page page) { /** * 修改当前第一个可用页 */ - private void addTempFreePage() { + private void addTempFreePage() throws PageCorruptedException { this.tempFreePage += 1; var page = pageStorage.get(0); page.pin(); @@ -277,7 +331,55 @@ private void addTempFreePage() { } @Override - public synchronized long insertItem(TransactionContext txContext, byte[] item) { + public void setMetaUuid(long uuid) throws PageException, IOException { + var page = getPage(0); + try { + var bytes = JBBPOut.BeginBin(). + Byte(HAS_HEADER). + Long(uuid).End().toByteArray(); + page.patchData(HEADER_OFFSET, bytes); + }finally { + releasePage(page); + } + + } + + @Override + public IRecoveryStorage getRecoveryStorage() { + return this.recoveryStorage; + } + + @Override + public int getMaxPageId() { + var page = getPage(0); + try{ + var header = parseTableHeader(page); + return header.tempFreePage; + }finally { + releasePage(page); + } + } + + @Override + public void flush(long uuid) throws FileException { + var page = getPage(uuid); + releasePage(page); + page.flush(); + } + + @Override + public synchronized long insertItem(TransactionContext txContext, byte[] item) throws PageCorruptedException { + long uuid = insertItemWithoutLog(item); + try { + recoveryStorage.insert(txContext.getXid(),uuid,item); + } catch (LogException e) { + log.warn("文件 {} 日志写入出现故障,触发回写", this.fileName, e); + flush(); + } + return uuid; + } + @Override + public synchronized long insertItemWithoutLog(byte[] item) throws PageCorruptedException{ var page = getPage(this.tempFreePage); try { var pageHeaderOp = getPageHeader(page); @@ -289,7 +391,7 @@ public synchronized long insertItem(TransactionContext txContext, byte[] item) { if (pageHeader.leftSpace - Math.min(item.length, MAX_RECORD_SIZE) <= MIN_LEFT_SPACE) { // 如果剩余空间过小的话,就切换到下一个页进行,同时修改表头信息.并且,若数据过大则使用拉链,所以取512和数据大小较小的 addTempFreePage(); - return insertItem(txContext, item); + return insertItemWithoutLog(item); } else { // 剩余空间足够,则插入 int rnd = Math.abs(new Random().nextInt()); @@ -300,7 +402,7 @@ public synchronized long insertItem(TransactionContext txContext, byte[] item) { rnd = Math.abs(new Random().nextInt()); uuid = ((s << 32) + (long) (rnd)); } - insertToPage(page, pageHeader, txContext, item, rnd); + insertToPage(page, pageHeader,item, rnd); return uuid; } } catch (PageCorruptedException e) { @@ -315,7 +417,7 @@ public synchronized long insertItem(TransactionContext txContext, byte[] item) { /** * 将数据插入到页内对应位置,并修改页头信息 */ - private void insertToPage(Page page, PageHeader pageHeader, TransactionContext txContext, byte[] item, int rnd) { + private void insertToPage(Page page, PageHeader pageHeader,byte[] item, int rnd) { if (item.length < MAX_RECORD_SIZE) { int offset = 0; if (pageHeader.recordNumber == 0) { @@ -355,7 +457,7 @@ private void insertToPage(Page page, PageHeader pageHeader, TransactionContext t } @Override - public synchronized void insertItemWithUuid(TransactionContext txContext, byte[] item, long uuid) { + public synchronized void insertItemWithUuid(byte[] item, long uuid) { if (!checkUuidExist(uuid)) { // 若不存在,则要恢复 var page = getPage(uuid); @@ -366,7 +468,7 @@ public synchronized void insertItemWithUuid(TransactionContext txContext, byte[] pageHeaderOp = Optional.of(initPage(page)); } int rnd = getRndByUuid(uuid); - insertToPage(page, pageHeaderOp.get(), txContext, item, rnd); + insertToPage(page, pageHeaderOp.get(), item, rnd); } catch (Exception e) { throw new PageCorruptedException(3); } finally { @@ -376,6 +478,35 @@ public synchronized void insertItemWithUuid(TransactionContext txContext, byte[] // 若存在则不需要恢复,直接返回 } + + @Override + public void deleteUuid(long uuid) throws PageException, IOException { + var page = getPage(uuid); + try { + var rnd = getRndByUuid(uuid); + var header = getPageHeader(page); + if (header.isPresent()){ + var items = header.get().item; + for (int i = 0;i < items.length ; i ++){ + if (rnd == items[i].uuid){ + // 该item对应的起止位置,将其数据项变为-1 + int offset = ITEM_OFFSET + i * ITEM_SIZE; + var bytes = JBBPOut.BeginBin().Int(-1).Int(0).End().toByteArray(); + page.patchData(offset,bytes); + return; + } + } + } + }finally { + releasePage(page); + } + } + + @Override + public void flush() { + pageStorage.flush(); + } + /** * 检查uuid是否存在,若Uuid的页号超过当前可用页,则直接返回False * @@ -448,8 +579,12 @@ public List listItemByPageId(int pageId) { if (pageHeaderOp.isPresent()) { var pageHeader = pageHeaderOp.get(); for (var item : pageHeader.item) { - var data = parseData(page, item); - bytes.add(data); + if (item.uuid < 0){ + continue; + }else { + var data = parseData(page, item); + bytes.add(data); + } } } return bytes; @@ -462,14 +597,27 @@ public List listItemByPageId(int pageId) { @Override public void updateItemByUuid(TransactionContext txContext, long uuid, byte[] item) throws UUIDException, PageCorruptedException { + var item_before = updateItemWithoutLog(uuid, item); + try { + recoveryStorage.update(txContext.getXid(),uuid,item_before,item); + } catch (LogException e) { + log.warn("文件 {} 日志写入出现故障,触发回写", this.fileName, e); + flush(); + } + } + + @Override + public byte[] updateItemWithoutLog(long uuid, byte[] item) throws UUIDException { var page = getPage(uuid); try { if (checkUuidExist(uuid)) { + // 若uuid存在 var pageHeader = getPageHeader(page); var items = pageHeader.get().item; try { for (var i : items) { if (i.uuid == getRndByUuid(uuid)) { + var item_before = parseData(page,i); var offset = i.offset; var bytes = JBBPOut.BeginBin(). Byte(NORMAL_DATA). @@ -477,24 +625,26 @@ public void updateItemByUuid(TransactionContext txContext, long uuid, byte[] ite .Byte(item) .End().toByteArray(); page.patchData(offset, bytes); - return; + return item_before; } } } catch (PageException | IOException e) { throw new PageCorruptedException(2, e); } } else { + // uuid不存在一般是恢复的时候 前面事务被取消. throw new UUIDException(2); } } finally { releasePage(page); } + return new byte[1]; } @Override public byte[] getMetadata() { var page = getPage(0); - var header = parseHeader(page); + var header = parseTableHeader(page); if (checkTableHeader(page) && header.hasHeaderInfo == HAS_HEADER) { // 若表头已经被初始化并且有标志位的话,就说明有表头信息,进行获取. byte[] h; @@ -515,14 +665,34 @@ public byte[] getMetadata() { } @Override - public void setMetadata(TransactionContext txContext, byte[] metadata) throws PageCorruptedException { + public long setMetadata(TransactionContext txContext, byte[] metadata) throws PageCorruptedException { + var page = getPage(0); + try{ + var header = parseTableHeader(page); + var uuid = setMetadataWithoutLog(metadata); + try { + recoveryStorage.updateMeta(txContext.getXid(),header.headerUuid,metadata); + } catch (LogException e) { + log.warn("文件 {} 日志写入出现故障,触发回写", this.fileName, e); + flush(); + } + return uuid; + }finally { + releasePage(page); + } + + } + + @Override + public long setMetadataWithoutLog(byte[] metadata) throws PageCorruptedException { var page = getPage(0); try { - var headerUuid = insertItem(txContext, metadata); + var headerUuid = insertItemWithoutLog(metadata); var bytes = JBBPOut.BeginBin(). Byte(HAS_HEADER). Long(headerUuid).End().toByteArray(); page.patchData(HEADER_OFFSET, bytes); + return headerUuid; } catch (Exception e) { throw new PageCorruptedException(1, e); } finally { diff --git a/src/main/java/net/kaaass/rumbase/dataitem/mock/MockItemStorage.java b/src/main/java/net/kaaass/rumbase/dataitem/mock/MockItemStorage.java index 19e6fd5..e2fdad7 100644 --- a/src/main/java/net/kaaass/rumbase/dataitem/mock/MockItemStorage.java +++ b/src/main/java/net/kaaass/rumbase/dataitem/mock/MockItemStorage.java @@ -2,9 +2,13 @@ import lombok.Data; import net.kaaass.rumbase.dataitem.IItemStorage; +import net.kaaass.rumbase.dataitem.exception.PageCorruptedException; import net.kaaass.rumbase.dataitem.exception.UUIDException; +import net.kaaass.rumbase.page.exception.PageException; +import net.kaaass.rumbase.recovery.IRecoveryStorage; import net.kaaass.rumbase.transaction.TransactionContext; +import java.io.IOException; import java.util.*; /** @@ -79,6 +83,26 @@ public static IItemStorage ofNewFile(TransactionContext txContext, String fileNa } + @Override + public void setMetaUuid(long uuid) { + + } + + @Override + public IRecoveryStorage getRecoveryStorage() { + return null; + } + + @Override + public int getMaxPageId() { + return 0; + } + + @Override + public void flush(long uuid) { + + } + @Override public long insertItem(TransactionContext txContext, byte[] item) { Random ran = new Random(); @@ -88,7 +112,12 @@ public long insertItem(TransactionContext txContext, byte[] item) { } @Override - public void insertItemWithUuid(TransactionContext txContext, byte[] item, long uuid) { + public long insertItemWithoutLog( byte[] item) { + return 0; + } + + @Override + public void insertItemWithUuid(byte[] item, long uuid) { maps.put(uuid, item); } @@ -115,14 +144,26 @@ public void updateItemByUuid(TransactionContext txContext, long uuid, byte[] ite } } + @Override + public byte[] updateItemWithoutLog(long uuid, byte[] item) throws UUIDException { + return new byte[0]; + } + @Override public byte[] getMetadata() { return meta; } @Override - public void setMetadata(TransactionContext txContext, byte[] metadata) { + public long setMetadata(TransactionContext txContext, byte[] metadata) { this.meta = metadata; + return 0; + } + + @Override + public long setMetadataWithoutLog(byte[] metadata) throws PageCorruptedException { + + return 0; } @@ -131,6 +172,15 @@ public void removeItems(List uuids) { System.out.println("已经清除文件对应uuid的信息"); } + @Override + public void deleteUuid(long uuid) throws IOException, PageException { + + } + + @Override + public void flush() { + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/src/main/java/net/kaaass/rumbase/page/RumPageStorage.java b/src/main/java/net/kaaass/rumbase/page/RumPageStorage.java index d94dc23..1a827af 100644 --- a/src/main/java/net/kaaass/rumbase/page/RumPageStorage.java +++ b/src/main/java/net/kaaass/rumbase/page/RumPageStorage.java @@ -1,21 +1,22 @@ package net.kaaass.rumbase.page; +import lombok.extern.slf4j.Slf4j; import net.kaaass.rumbase.page.exception.BufferException; import net.kaaass.rumbase.page.exception.FileException; import java.io.*; import java.util.Arrays; -import java.util.HashMap; import java.util.Map; -import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; /** * @author 11158 */ +@Slf4j public class RumPageStorage implements PageStorage { - public RumPageStorage(String filepath) throws FileException { + public RumPageStorage(String filepath) { this.filepath = filepath; - pageMap = new HashMap<>(); + pageMap = new ConcurrentHashMap<>(); } @Override @@ -28,7 +29,8 @@ public Page get(long pageId) { FileOutputStream out = new FileOutputStream(file); out.write(new byte[PageManager.PAGE_SIZE * 10]); } catch (IOException e) { - e.printStackTrace(); + log.error("创建文件失败", e); + System.exit(1); } } //文件会预留5页作为文件头 @@ -40,7 +42,7 @@ public Page get(long pageId) { while (in.available() < (pageId + 1 + PageManager.FILE_HEAD_SIZE) * PageManager.PAGE_SIZE) { FileWriter fw = new FileWriter(file, true); char[] blank = new char[PageManager.PAGE_SIZE * (in.available() / PageManager.PAGE_SIZE)]; - Arrays.fill(blank, (char)0); + Arrays.fill(blank, (char) 0); fw.write(blank); fw.close(); } @@ -59,7 +61,8 @@ public Page get(long pageId) { } int offset = -1; while (offset < 0) { - synchronized (RumBuffer.getInstance()) {//并非区间锁,而是将整个内存全部锁住 + //并非区间锁,而是将整个内存全部锁住 + synchronized (RumBuffer.getInstance()) { try { offset = RumBuffer.getInstance().getFreeOffset(); RumBuffer.getInstance().put(offset, data); @@ -86,12 +89,15 @@ public Page get(long pageId) { @Override public void flush() { - Set> entrySet = this.pageMap.entrySet(); - for (Map.Entry entry : entrySet) { + var entrySet = this.pageMap.entrySet(); + for (var entry : entrySet) { + var page = entry.getValue(); try { - entry.getValue().flush(); + if (page instanceof RumPage && ((RumPage) page).dirty()) { + page.flush(); + } } catch (Exception e) { - e.printStackTrace(); + log.warn("回写页面 {} 发生异常", entry.getKey()); } } } diff --git a/src/main/java/net/kaaass/rumbase/record/IRecordStorage.java b/src/main/java/net/kaaass/rumbase/record/IRecordStorage.java index abe0919..cfbba76 100644 --- a/src/main/java/net/kaaass/rumbase/record/IRecordStorage.java +++ b/src/main/java/net/kaaass/rumbase/record/IRecordStorage.java @@ -1,8 +1,10 @@ package net.kaaass.rumbase.record; +import net.kaaass.rumbase.page.exception.FileException; import net.kaaass.rumbase.record.exception.RecordNotFoundException; import net.kaaass.rumbase.transaction.TransactionContext; +import java.io.IOException; import java.util.Optional; /** diff --git a/src/main/java/net/kaaass/rumbase/record/MvccRecordStorage.java b/src/main/java/net/kaaass/rumbase/record/MvccRecordStorage.java index 29a6a0a..de87af2 100644 --- a/src/main/java/net/kaaass/rumbase/record/MvccRecordStorage.java +++ b/src/main/java/net/kaaass/rumbase/record/MvccRecordStorage.java @@ -7,14 +7,17 @@ import lombok.extern.slf4j.Slf4j; import net.kaaass.rumbase.dataitem.IItemStorage; import net.kaaass.rumbase.dataitem.exception.UUIDException; +import net.kaaass.rumbase.page.exception.FileException; import net.kaaass.rumbase.record.exception.NeedRollbackException; import net.kaaass.rumbase.record.exception.RecordNotFoundException; import net.kaaass.rumbase.record.exception.StorageCorruptedException; +import net.kaaass.rumbase.recovery.exception.LogException; import net.kaaass.rumbase.transaction.TransactionContext; import net.kaaass.rumbase.transaction.TransactionIsolation; import net.kaaass.rumbase.transaction.TransactionStatus; import net.kaaass.rumbase.transaction.exception.DeadlockException; +import java.io.IOException; import java.util.Optional; /** diff --git a/src/main/java/net/kaaass/rumbase/recovery/IRecoveryStorage.java b/src/main/java/net/kaaass/rumbase/recovery/IRecoveryStorage.java index 0684071..d692842 100644 --- a/src/main/java/net/kaaass/rumbase/recovery/IRecoveryStorage.java +++ b/src/main/java/net/kaaass/rumbase/recovery/IRecoveryStorage.java @@ -1,5 +1,10 @@ package net.kaaass.rumbase.recovery; +import net.kaaass.rumbase.page.exception.FileException; +import net.kaaass.rumbase.page.exception.PageException; +import net.kaaass.rumbase.recovery.exception.LogException; + +import java.io.IOException; import java.util.List; /** @@ -14,21 +19,21 @@ public interface IRecoveryStorage { * @param xid 事务编号 * @param snapshots 快照集合 */ - void begin(int xid, List snapshots); + void begin(int xid, List snapshots) throws LogException; /** * 记录事务失败回滚 * * @param xid */ - void rollback(int xid); + void rollback(int xid) throws LogException; /** * 记录事务完成 * * @param xid */ - void commit(int xid); + void commit(int xid) throws LogException; /** * 插入数据项的日志记录 @@ -37,27 +42,30 @@ public interface IRecoveryStorage { * @param uuid 数据项的对应编号 * @param item 插入的数据内容 */ - void insert(int xid, long uuid, byte[] item); + void insert(int xid, long uuid, byte[] item) throws LogException; /** * 更新数据项的日志记录 * * @param xid * @param uuid - * @param item */ - void update(int xid, long uuid, byte[] item); + void update(int xid, long uuid, byte[] itemBefore, byte[] itemAfter) throws LogException; /** * 更新数据项的日志头 * * @param xid - * @param metaUUID 头信息的UUID */ - void updateMeta(int xid, long metaUUID); + void updateMeta(int xid, long beforeUuid,byte[] metadata) throws LogException; /** * 模拟打印日志资料 */ List getContent(); + + /** + * 恢复数据 + */ + void recovery() throws LogException; } diff --git a/src/main/java/net/kaaass/rumbase/recovery/RecoveryManager.java b/src/main/java/net/kaaass/rumbase/recovery/RecoveryManager.java index 4fbc18a..509fff8 100644 --- a/src/main/java/net/kaaass/rumbase/recovery/RecoveryManager.java +++ b/src/main/java/net/kaaass/rumbase/recovery/RecoveryManager.java @@ -1,21 +1,43 @@ package net.kaaass.rumbase.recovery; +import lombok.extern.slf4j.Slf4j; +import net.kaaass.rumbase.page.exception.FileException; +import net.kaaass.rumbase.page.exception.PageException; +import net.kaaass.rumbase.recovery.exception.LogException; import net.kaaass.rumbase.recovery.mock.MockRecoveryStorage; +import java.io.IOException; + /** * 日志恢复的管理器,用来对每个数据库文件进行恢复 + * FIXME 需要同时在所有文件记录事务的发生 * * @author kaito */ +@Slf4j public class RecoveryManager { /** - * 对某个数据库文件进行恢复,并且返回对应的日志管理器 + * 对某个数据库文件进行恢复 * * @param fileName 文件名 * @return 数据库日志管理器 */ - public static IRecoveryStorage recovery(String fileName) { - // TODO:对数据进行恢复 - return new MockRecoveryStorage(); + public static void recovery(String fileName) throws LogException { + IRecoveryStorage recoveryStorage; + try { + recoveryStorage = RecoveryStorage.ofFile(fileName); + } catch (LogException e) { + log.debug("恢复日志不存在,忽略该文件的恢复", e); + return; + } + recoveryStorage.recovery(); + } + + public static IRecoveryStorage getRecoveryStorage(String fileName) throws LogException { + return RecoveryStorage.ofFile(fileName); + } + + public static IRecoveryStorage createRecoveryStorage(String fileName) throws LogException { + return RecoveryStorage.ofNewFile(fileName); } } diff --git a/src/main/java/net/kaaass/rumbase/recovery/RecoveryStorage.java b/src/main/java/net/kaaass/rumbase/recovery/RecoveryStorage.java new file mode 100644 index 0000000..700a7e2 --- /dev/null +++ b/src/main/java/net/kaaass/rumbase/recovery/RecoveryStorage.java @@ -0,0 +1,457 @@ +package net.kaaass.rumbase.recovery; + +import com.igormaznitsa.jbbp.JBBPParser; +import com.igormaznitsa.jbbp.io.JBBPOut; +import com.igormaznitsa.jbbp.mapper.Bin; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import net.kaaass.rumbase.dataitem.IItemStorage; +import net.kaaass.rumbase.dataitem.ItemManager; +import net.kaaass.rumbase.dataitem.exception.UUIDException; +import net.kaaass.rumbase.page.exception.FileException; +import net.kaaass.rumbase.page.exception.PageException; +import net.kaaass.rumbase.recovery.exception.LogException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * 日志记录相关内容 + * + * @author kaito + */ +@Slf4j +public class RecoveryStorage implements IRecoveryStorage { + + + /** + * 用于对自己的日志记录进行管理 + */ + private IItemStorage logStorage; + /** + * 要恢复的对象对应的文件名 + */ + private String fileName; + + public RecoveryStorage(IItemStorage itemStorage, String fileName) { + this.logStorage = itemStorage; + this.fileName = fileName; + } + + /** + * 读取日志文件并且解析得到日志管理器 + * + * @param fileName + * @return 日志管理器 + */ + public static IRecoveryStorage ofFile(String fileName) throws LogException { + try { + var itemStorage = ItemManager.fromFileWithoutLog(fileName + ".log"); + return new RecoveryStorage(itemStorage, fileName); + } catch (FileException | PageException e) { + throw new LogException(10); + } + } + + /** + * 创建日志文件 + * + * @param fileName 文件名 + * @return + */ + public static IRecoveryStorage ofNewFile(String fileName) throws LogException { + try { + var metadata = JBBPOut.BeginBin().Int(HEADER).End().toByteArray(); + var itemStorage = ItemManager.createFileWithoutLog(fileName + ".log", metadata); + return new RecoveryStorage(itemStorage, fileName); + } catch (IOException | FileException | PageException e) { + throw new LogException(9, e); + } + } + + @Override + public void begin(int xid, List snapshots) throws LogException { + try { + var jbbp = JBBPOut.BeginBin(). + Byte(TX_BEGIN). + Int(xid). + Int(snapshots.size()); + for (var i : snapshots) { + jbbp = jbbp.Int(i); + } + var bytes = jbbp.End().toByteArray(); + var uuid = logStorage.insertItemWithoutLog(bytes); + logStorage.flush(uuid); + } catch (IOException | FileException e) { + throw new LogException(3); + } + } + + @Override + public void rollback(int xid) throws LogException { + try { + var bytes = JBBPOut.BeginBin(). + Byte(TX_ABORT). + Int(xid). + End().toByteArray(); + var uuid = logStorage.insertItemWithoutLog(bytes); + logStorage.flush(uuid); + } catch (FileException | IOException e) { + throw new LogException(4); + } + } + + @Override + public void commit(int xid) throws LogException { + try { + var bytes = JBBPOut.BeginBin(). + Byte(TX_COMMIT). + Int(xid). + End().toByteArray(); + var uuid = logStorage.insertItemWithoutLog(bytes); + logStorage.flush(uuid); + } catch (FileException | IOException e) { + throw new LogException(5); + } + + } + + @Override + public void insert(int xid, long uuid, byte[] item) throws LogException { + try { + var bytes = JBBPOut.BeginBin(). + Byte(INSERT_FLAG). + Int(xid). + Long(uuid). + Int(item.length). + Byte(item). + End().toByteArray(); + var id = logStorage.insertItemWithoutLog(bytes); + logStorage.flush(id); + } catch (FileException | IOException e) { + throw new LogException(6); + } + } + + @Override + public void update(int xid, long uuid, byte[] itemBefore, byte[] itemAfter) throws LogException { + try { + var bytes = JBBPOut.BeginBin(). + Byte(UPDATE_FLAG). + Int(xid). + Long(uuid). + Int(itemBefore.length). + Byte(itemBefore). + Int(itemAfter.length). + Byte(itemAfter). + End().toByteArray(); + var id = logStorage.insertItemWithoutLog(bytes); + logStorage.flush(id); + } catch (FileException | IOException e) { + throw new LogException(7); + } + } + + @Override + public void updateMeta(int xid, long beforeUuid, byte[] metadata) throws LogException { + try { + var bytes = JBBPOut.BeginBin(). + Byte(METADATA_UPDATE_FLAG). + Int(xid). + Long(beforeUuid). + Int(metadata.length). + Byte(metadata). + End().toByteArray(); + var uuid = logStorage.insertItemWithoutLog(bytes); + logStorage.flush(uuid); + } catch (FileException | IOException e) { + throw new LogException(8); + } + + } + + @Override + public List getContent() { + List logList = new ArrayList<>(); + var maxPageId = logStorage.getMaxPageId(); + for (int i = 1; i <= maxPageId; i++) { + var logs = logStorage.listItemByPageId(i); + for (var l : logs) { + if (l.length > 4) { + logList.add(l); + } + } + } + return logList; + } + + @Override + public void recovery() throws LogException { + try { + var itemStorage = ItemManager.fromFile(this.fileName); + // TODO 判断是否进行恢复 + log.info("开始恢复文件 {}...", this.fileName); + var logs = getContent(); + var xidMaps = parseXid(logs); + log.debug("文件 {} 回滚事务:{}", this.fileName, xidMaps); + // 逐条回滚 + for (var log : logs) { + parseLog(log, itemStorage, xidMaps); + } + } catch (FileException | PageException e) { + throw new LogException(11); + } + } + + /** + * 先进行一遍解析,得到所有已完成的事务和未完成的事务,来决定redo还是undo + */ + private Map> parseXid(List logs) throws LogException { + List commitXids = List.of(0); + List abortXids = new ArrayList<>(); + + for (var binLog : logs) { + Tx tx; + try { + tx = parseTx(binLog); + } catch (IOException e) { + throw new LogException(1); + } + switch (tx.type) { + case TX_COMMIT: + // 若是commit 则放入commitXid并将对应的xid移出abort + Integer id = tx.xid; + abortXids.remove(id); + commitXids.add(tx.xid); + break; + case TX_ABORT: + // 因为begin就放入abort,所以abort不用管 + break; + case TX_BEGIN: + abortXids.add(tx.xid); + break; + default: + } + } + + Map> maps = new HashMap<>(); + maps.put('C', commitXids); + maps.put('A', abortXids); + return maps; + } + + /** + * 解析事务状态 + */ + private Tx parseTx(byte[] log) throws IOException { + var tx = JBBPParser.prepare("byte type;int xid;").parse(log).mapTo(new Tx()); + return tx; + } + + /** + * 解析类型 + */ + private byte parseType(byte[] log) throws IOException { + var type = JBBPParser.prepare("byte type;").parse(log).mapTo(new Type()).type; + return type; + } + + /** + * 解析插入 + */ + private InsertLog parseInsert(byte[] log) throws IOException { + var insertLog = JBBPParser.prepare("byte type;int xid;long uuid;int length;byte[length] item;") + .parse(log).mapTo(new InsertLog()); + return insertLog; + } + + /** + * 解析更新 + */ + private UpdateLog parseUpdate(byte[] log) throws IOException { + var updateLog = JBBPParser.prepare("byte type;int xid;long uuid;int length1;" + + "byte[length1] itemBefore;int length2;byte[length2] itemAfter;").parse(log).mapTo(new UpdateLog()); + return updateLog; + } + + private UpdateMetaLog parseUpdateMeta(byte[] log) throws IOException { + var updateMeta = JBBPParser.prepare("byte type;int xid;long beforeUuid;" + + "int length;byte[length] metadata;"). + parse(log).mapTo(new UpdateMetaLog()); + return updateMeta; + } + + private boolean checkCommit(int xid, Map> maps) throws LogException { + if (maps.get('C').contains(xid)) { + return true; + } else if (maps.get('A').contains(xid)) { + return false; + } else { + throw new LogException(2); + } + } + + private void parseLog(byte[] log, IItemStorage itemStorage, Map> xidMaps) throws LogException { + try { + var type = parseType(log); + switch (type) { + case INSERT_FLAG: + System.out.println("正在解析插入"); + var insertLog = parseInsert(log); + if (checkCommit(insertLog.xid, xidMaps)) { + // 如果事务已经提交,则redo + itemStorage.insertItemWithUuid(insertLog.item, insertLog.uuid); + } else { + // 如果事务没有提交,则undo + itemStorage.deleteUuid(insertLog.uuid); + } + break; + case UPDATE_FLAG: + System.out.println("正在解析更新"); + var updateLog = parseUpdate(log); + if (checkCommit(updateLog.xid, xidMaps)) { + // 若事务已经提交则redo + try { + itemStorage.updateItemWithoutLog(updateLog.uuid, updateLog.itemAfter); + } catch (UUIDException ignored) { + // uuid不存在说明对应之前的事务没有执行,是正常 + } + } else { + // 若事务没有提交,则undo,恢复之前的数据 + try { + itemStorage.updateItemWithoutLog(updateLog.uuid, updateLog.itemBefore); + } catch (UUIDException ignored) { + + } + } + break; + case METADATA_UPDATE_FLAG: + System.out.println("正在解析头信息更新"); + var updateMetaLog = parseUpdateMeta(log); + if (checkCommit(updateMetaLog.xid, xidMaps)) { + // redo + itemStorage.setMetadataWithoutLog(updateMetaLog.metadata); + } else { + // undo + itemStorage.setMetaUuid(updateMetaLog.beforeUuid); + } + break; + default: + return; + } + } catch (IOException | PageException e) { + throw new LogException(1); + } + } + + /** + * 日志头信息 + * TODO: 记录点等特殊要求 + */ + public static class LogHeader { + @Bin + int header; + } + + /** + * 解析类型 + */ + public static class Type { + @Bin + byte type; + } + + /** + * 事务状态的解析 + */ + @Data + public static class Tx { + @Bin + byte type; + @Bin + int xid; + } + + /** + * 插入的解析 + */ + public static class InsertLog { + @Bin + byte type; + @Bin + int xid; + @Bin + long uuid; + @Bin + int length; + @Bin + byte[] item; + + public Object newInstance(Class klazz) { + return klazz == InsertLog.class ? new InsertLog() : null; + } + } + + /** + * 更新的解析 + */ + public static class UpdateLog { + @Bin + byte type; + @Bin + int xid; + @Bin + long uuid; + @Bin + int length1; + @Bin + byte[] itemBefore; + @Bin + int length2; + @Bin + byte[] itemAfter; + + public Object newInstance(Class klazz) { + return klazz == UpdateLog.class ? new UpdateLog() : null; + } + } + + public static class UpdateMetaLog { + @Bin + byte type; + @Bin + int xid; + @Bin + long beforeUuid; + @Bin + int length; + @Bin + byte[] metadata; + + public Object newInstance(Class klazz) { + return klazz == UpdateMetaLog.class ? new UpdateMetaLog() : null; + } + } + + /** + * 日志标志 + */ + final private static int HEADER = 4567; + + final private static byte INSERT_FLAG = 'i'; + + final private static byte TX_BEGIN = 'b'; + + final private static byte TX_ABORT = 'a'; + + final private static byte TX_COMMIT = 'c'; + + final private static byte UPDATE_FLAG = 'u'; + + final private static byte METADATA_UPDATE_FLAG = 'm'; + + +} diff --git a/src/main/java/net/kaaass/rumbase/recovery/exception/LogException.java b/src/main/java/net/kaaass/rumbase/recovery/exception/LogException.java new file mode 100644 index 0000000..bc44422 --- /dev/null +++ b/src/main/java/net/kaaass/rumbase/recovery/exception/LogException.java @@ -0,0 +1,35 @@ +package net.kaaass.rumbase.recovery.exception; + +import net.kaaass.rumbase.exception.RumbaseException; + +import java.util.HashMap; +import java.util.Map; + +public class LogException extends RumbaseException { + /** + * 构造Rumbase异常 + * + */ + + public static final Map REASONS = new HashMap() {{ + put(1, "日志文件无法解析"); + put(2, "事务ID不存在"); + put(3, "事务提交日志写回错误"); + put(4, "事务回滚日志写回错误"); + put(5, "事务提交日志写回错误"); + put(6, "插入数据项日志写回错误"); + put(7, "更新数据项日志写回错误"); + put(8, "更新头信息日志写回错误"); + put(9, "日志文件创建失败"); + put(10, "日志文件打开失败"); + put(11, "日志获取原文件失败"); + }}; + + public LogException(int subId) { + super(3001,subId,REASONS.get(subId)); + } + + public LogException(int subId, Throwable e) { + super(3001,subId,REASONS.get(subId), e); + } +} diff --git a/src/main/java/net/kaaass/rumbase/recovery/mock/MockRecoveryStorage.java b/src/main/java/net/kaaass/rumbase/recovery/mock/MockRecoveryStorage.java index fa8c4e9..b22049d 100644 --- a/src/main/java/net/kaaass/rumbase/recovery/mock/MockRecoveryStorage.java +++ b/src/main/java/net/kaaass/rumbase/recovery/mock/MockRecoveryStorage.java @@ -2,6 +2,7 @@ import net.kaaass.rumbase.recovery.IRecoveryStorage; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -49,19 +50,17 @@ public void insert(int xid, long uuid, byte[] item) { } @Override - public void update(int xid, long uuid, byte[] item) { + public void update(int xid, long uuid, byte[] itemBefore, byte[] itemAfter) { String updateStr = "update " + xid + " " + uuid + " "; // 对控制语句、数据两部分进行合并得到最终日志记录 byte[] first = updateStr.getBytes(); - byte[] result = Arrays.copyOf(first, first.length + item.length); - System.arraycopy(item, 0, result, first.length, item.length); + byte[] result = Arrays.copyOf(first, first.length + itemBefore.length); + System.arraycopy(itemBefore, 0, result, first.length, itemBefore.length); bytes.add(result); } @Override - public void updateMeta(int xid, long metaUUID) { - String updateMetaStr = "meta " + xid + " " + metaUUID; - bytes.add(updateMetaStr.getBytes()); + public void updateMeta(int xid,long beforeUuid,byte[] metadata) { } @Override @@ -69,5 +68,7 @@ public List getContent() { return bytes; } - + @Override + public void recovery() { + } } diff --git a/src/main/java/net/kaaass/rumbase/server/Server.java b/src/main/java/net/kaaass/rumbase/server/Server.java index d3783d9..ba6a9ee 100644 --- a/src/main/java/net/kaaass/rumbase/server/Server.java +++ b/src/main/java/net/kaaass/rumbase/server/Server.java @@ -8,6 +8,8 @@ import net.kaaass.rumbase.page.exception.FileException; import net.kaaass.rumbase.query.exception.ArgumentException; import net.kaaass.rumbase.record.exception.RecordNotFoundException; +import net.kaaass.rumbase.recovery.RecoveryManager; +import net.kaaass.rumbase.recovery.exception.LogException; import net.kaaass.rumbase.table.TableManager; import net.kaaass.rumbase.table.exception.TableConflictException; import net.kaaass.rumbase.table.exception.TableExistenceException; @@ -51,10 +53,14 @@ public class Server { */ public void prepare() { // 准备文件夹 - var tableFolder = new File("data/table/a"); - assert tableFolder.exists() || tableFolder.mkdirs(); - var indexFolder = new File("data/index/a"); - assert indexFolder.exists() || indexFolder.mkdirs(); + var tableFolder = new File("data/table/"); + if (!tableFolder.exists()) { + tableFolder.mkdirs(); + } + var indexFolder = new File("data/index/"); + if (!indexFolder.exists()) { + indexFolder.mkdirs(); + } // 初始化事务管理器 log.info("初始化事务管理器..."); try { @@ -63,15 +69,23 @@ public void prepare() { log.error("初始化事务管理器失败", e); System.exit(1); } + // 恢复表管理器 + try { + RecoveryManager.recovery("data/metadata.db"); + } catch (LogException e) { + log.error("无法恢复表管理器,数据可能损坏!", e); + System.exit(1); + } // 初始化表管理器 log.info("初始化表管理器..."); - // TODO 先恢复metadata try { tableManager = new TableManager(); - } catch (TableExistenceException | TableConflictException | RecordNotFoundException | ArgumentException | IndexAlreadyExistException e) { + } catch (IndexAlreadyExistException e) { log.error("初始化表管理器失败", e); System.exit(1); } + // 恢复、载入其他表 + tableManager.prepare(); // 初始化线程池 log.info("初始化线程池..."); var namedThreadFactory = Executors.defaultThreadFactory(); diff --git a/src/main/java/net/kaaass/rumbase/table/TableManager.java b/src/main/java/net/kaaass/rumbase/table/TableManager.java index ecb925a..b4ff1c8 100644 --- a/src/main/java/net/kaaass/rumbase/table/TableManager.java +++ b/src/main/java/net/kaaass/rumbase/table/TableManager.java @@ -1,11 +1,14 @@ package net.kaaass.rumbase.table; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import net.kaaass.rumbase.index.exception.IndexAlreadyExistException; import net.kaaass.rumbase.query.exception.ArgumentException; import net.kaaass.rumbase.record.IRecordStorage; import net.kaaass.rumbase.record.RecordManager; import net.kaaass.rumbase.record.exception.RecordNotFoundException; +import net.kaaass.rumbase.recovery.RecoveryManager; +import net.kaaass.rumbase.recovery.exception.LogException; import net.kaaass.rumbase.table.exception.TableConflictException; import net.kaaass.rumbase.table.field.BaseField; import net.kaaass.rumbase.table.exception.TableExistenceException; @@ -25,7 +28,7 @@ * * @author @KveinAxel */ - +@Slf4j public class TableManager { static { @@ -68,11 +71,11 @@ public void abort(TransactionContext context) { context.rollback(); } - public TableManager() throws TableExistenceException, TableConflictException, RecordNotFoundException, ArgumentException, IndexAlreadyExistException { + public TableManager() throws IndexAlreadyExistException { load(); } - public void load() throws TableExistenceException, TableConflictException, RecordNotFoundException, ArgumentException, IndexAlreadyExistException { + public void load() throws IndexAlreadyExistException { var context = TransactionContext.empty(); Table metaTable; try { @@ -102,29 +105,56 @@ public void load() throws TableExistenceException, TableConflictException, Recor metaTable.persist(context); tableCache.put("metadata", metaTable); } - var data = metaTable.readAll(context); + } + + /** + * 启动服务器前进行的准备 + */ + public void prepare() { + var context = TransactionContext.empty(); + var metaTable = tableCache.get("metadata"); + List> data = null; + try { + data = metaTable.readAll(context); + } catch (TableExistenceException | TableConflictException | ArgumentException | RecordNotFoundException e) { + log.error("无法读入元数据表,数据可能损坏!", e); + System.exit(1); + } + // 载入所有已有表 var map = new HashMap(); data.forEach(row -> map.put((String) row.get(0), (String) row.get(1))); if (!map.containsKey("table_num")) { - metaTable.insert(context, new ArrayList<>(){{ - add("'table_num'"); - add("'0'"); - }}); + try { + metaTable.insert(context, new ArrayList<>(){{ + add("'table_num'"); + add("'0'"); + }}); + } catch (TableConflictException | TableExistenceException | ArgumentException e) { + log.error("无法初始化元数据表", e); + System.exit(1); + } map.put("table_num", "0"); } for (var item: map.entrySet()) { if (item.getKey().startsWith("tablePath$")) { var tableName = item.getKey().split("\\$")[1]; + var tablePath = item.getValue(); + // 恢复表 + try { + RecoveryManager.recovery(tablePath); + } catch (LogException e) { + log.error("无法恢复表 {} 于 {},数据可能损坏!", tableName, tablePath, e); + System.exit(1); + } + // 读入表 var record = RecordManager.fromFile(item.getValue()); - recordPaths.add(item.getValue()); + recordPaths.add(tablePath); var table = Table.load(record); tableCache.put(tableName, table); } } - - } /** diff --git a/src/main/java/net/kaaass/rumbase/transaction/TransactionContextImpl.java b/src/main/java/net/kaaass/rumbase/transaction/TransactionContextImpl.java index bb803bd..6a0f04e 100644 --- a/src/main/java/net/kaaass/rumbase/transaction/TransactionContextImpl.java +++ b/src/main/java/net/kaaass/rumbase/transaction/TransactionContextImpl.java @@ -2,6 +2,8 @@ import lombok.Getter; import lombok.Setter; +import net.kaaass.rumbase.recovery.RecoveryManager; +import net.kaaass.rumbase.recovery.RecoveryStorage; import net.kaaass.rumbase.transaction.exception.DeadlockException; import net.kaaass.rumbase.transaction.lock.LockTable; import net.kaaass.rumbase.transaction.lock.LockTableImpl; diff --git a/src/test/java/net/kaaass/rumbase/dataitem/IItemStorageTest.java b/src/test/java/net/kaaass/rumbase/dataitem/IItemStorageTest.java index cd6f433..c0fad1f 100644 --- a/src/test/java/net/kaaass/rumbase/dataitem/IItemStorageTest.java +++ b/src/test/java/net/kaaass/rumbase/dataitem/IItemStorageTest.java @@ -1,15 +1,14 @@ package net.kaaass.rumbase.dataitem; -import junit.framework.TestCase; import lombok.extern.slf4j.Slf4j; import net.kaaass.rumbase.FileUtil; import net.kaaass.rumbase.dataitem.exception.PageCorruptedException; import net.kaaass.rumbase.dataitem.exception.UUIDException; import net.kaaass.rumbase.page.exception.FileException; import net.kaaass.rumbase.page.exception.PageException; +import net.kaaass.rumbase.recovery.exception.LogException; import net.kaaass.rumbase.transaction.TransactionContext; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -32,6 +31,8 @@ @Slf4j public class IItemStorageTest { + private static final String PATH = FileUtil.TEST_PATH; + @BeforeClass public static void createDataFolder() { FileUtil.prepare(); @@ -42,37 +43,35 @@ public static void clearDataFolder() { FileUtil.clear(); } - private static final String PATH = FileUtil.TEST_PATH; - /** * 测试能否从已有文件中解析得到数据项管理器 */ @Test - public void testGetFromFile() throws FileException, IOException, PageException { + public void testGetFromFile() throws FileException, PageException, LogException { String fileName = PATH + "testGetFromFile.db"; - var itemStorage = ItemManager.fromFile(fileName); + ItemManager.fromFile(fileName); // 如果表中没有对应的文件,那么就抛出错误 -// String failFileName = "error.db"; -// try { -// IItemStorage iItemStorage1 = ItemManager.fromFile(failFileName); -// } catch (FileException f) { -// log.error("Exception Error :", f); -// } + String failFileName = "error.db"; + try { + ItemManager.fromFile(failFileName); + } catch (FileException f) { + log.error("Exception Error :", f); + } } /** * 测试能否新建文件并得到数据项管理器 */ @Test - public void testCreateFile() throws IOException, FileException, PageException { + public void testCreateFile() throws FileException, PageException, LogException { TransactionContext txContext = TransactionContext.empty(); String fileName = PATH + "testCreateFile.db"; byte[] metadata = new byte[1024]; // 第一次执行的时候,表中没有数据,不会报错 - var iItemStorage = ItemManager.createFile(txContext, fileName, metadata); + ItemManager.createFile(txContext, fileName, metadata); try { - iItemStorage = ItemManager.createFile(txContext, fileName, metadata); + ItemManager.createFile(txContext, fileName, metadata); fail("should get exception"); } catch (Exception e) { log.error("Exception Error :", e); @@ -82,8 +81,7 @@ public void testCreateFile() throws IOException, FileException, PageException { /** * 进行插入的测试 */ - @Test - public void testInsert() throws FileException, IOException, PageException, UUIDException, PageCorruptedException { + public void testInsert() throws FileException, IOException, PageException, UUIDException, PageCorruptedException, LogException { String fileName = PATH + "testInsert.db"; IItemStorage iItemStorage = ItemManager.fromFile(fileName); byte[] bytes = new byte[]{1, 2, 3, 4}; @@ -100,8 +98,7 @@ public void testInsert() throws FileException, IOException, PageException, UUIDE /** * 对插入一个已分配UUID的测试 */ - @Test - public void testInsertWithUUID() throws FileException, IOException, PageException { + public void testInsertWithUUID() throws FileException, IOException, PageException, LogException { String fileName = PATH + "testInsertWithUUID.db"; IItemStorage iItemStorage = ItemManager.fromFile(fileName); byte[] bytes = new byte[]{1, 2, 3, 4}; @@ -110,7 +107,7 @@ public void testInsertWithUUID() throws FileException, IOException, PageExceptio long uuid = (s << 32) + rnd; TransactionContext txContext = TransactionContext.empty(); // 第一次插入,表中没有该UUID,可以正常执行 - iItemStorage.insertItemWithUuid(txContext, bytes, uuid); + iItemStorage.insertItemWithUuid(bytes, uuid); try { assertArrayEquals(bytes, iItemStorage.queryItemByUuid(uuid)); } catch (UUIDException | PageCorruptedException e) { @@ -118,15 +115,14 @@ public void testInsertWithUUID() throws FileException, IOException, PageExceptio } // 第二次插入 - iItemStorage.insertItemWithUuid(txContext, bytes, uuid); + iItemStorage.insertItemWithUuid(bytes, uuid); } /** * 对插入大量数据进行测试 */ - @Test - public void testManyInsert() throws FileException, IOException, PageException, UUIDException, PageCorruptedException { + public void testManyInsert() throws FileException, IOException, PageException, UUIDException, PageCorruptedException, LogException { String fileName = PATH + "testInsertMany.db"; IItemStorage iItemStorage = ItemManager.fromFile(fileName); byte[] bytes = new byte[]{1, 2, 3, 4}; @@ -146,8 +142,7 @@ public void testManyInsert() throws FileException, IOException, PageException, U /** * 获取整个页的数据项进行测试 */ - @Test - public void testQueryByPageID() throws FileException, IOException, PageException, PageCorruptedException { + public void testQueryByPageID() throws FileException, IOException, PageException, PageCorruptedException, LogException { String fileName = PATH + "testQueryByPageID.db"; IItemStorage iItemStorage = ItemManager.fromFile(fileName); byte[] bytes = new byte[]{1, 2, 3, 4}; @@ -182,36 +177,10 @@ public void testQueryByPageID() throws FileException, IOException, PageException } } - - static class Insert implements Runnable { - IItemStorage iItemStorage; - TransactionContext txContext; - - public Insert(IItemStorage iItemStorage, TransactionContext txContext) { - this.iItemStorage = iItemStorage; - this.txContext = txContext; - } - - @Override - public void run() { - var bytes = new byte[]{1, 2, 3, 4}; - try { - for (int i = 0; i < 100; i++) { - long uuid = iItemStorage.insertItem(txContext, bytes); - assertArrayEquals(bytes, iItemStorage.queryItemByUuid(uuid)); - } - } catch (Exception e) { - e.printStackTrace(); - fail("Exception caught"); - } - } - } - /** * 测试并发下插入是否有问题 */ - @Test - public void testSynInsert() throws IOException, FileException, PageException { + public void testSynInsert() throws IOException, FileException, PageException, LogException { String fileName = PATH + "testInsert.db"; IItemStorage iItemStorage = ItemManager.fromFile(fileName); byte[] bytes = new byte[]{1, 2, 3, 4}; @@ -222,12 +191,10 @@ public void testSynInsert() throws IOException, FileException, PageException { new Thread(new Insert(iItemStorage, txContext)).start(); } - /** * 对更新进行测试 */ - @Test - public void testUpdate() throws FileException, IOException, PageException, UUIDException, PageCorruptedException { + public void testUpdate() throws FileException, IOException, PageException, UUIDException, PageCorruptedException, LogException { String fileName = PATH + "testUpdate.db"; TransactionContext txContext = TransactionContext.empty(); IItemStorage iItemStorage = ItemManager.fromFile(fileName); @@ -254,8 +221,7 @@ public void testUpdate() throws FileException, IOException, PageException, UUIDE /** * 测试修改和获取表头信息 */ - @Test - public void testMeta() throws FileException, IOException, PageException, UUIDException, PageCorruptedException { + public void testMeta() throws FileException, IOException, PageException, UUIDException, PageCorruptedException, LogException { String fileName = PATH + "testMeta.db"; IItemStorage iItemStorage = ItemManager.fromFile(fileName); byte[] result = new byte[]{1, 2, 3, 4}; @@ -265,4 +231,28 @@ public void testMeta() throws FileException, IOException, PageException, UUIDExc assertArrayEquals(result, bs); } + static class Insert implements Runnable { + IItemStorage iItemStorage; + TransactionContext txContext; + + public Insert(IItemStorage iItemStorage, TransactionContext txContext) { + this.iItemStorage = iItemStorage; + this.txContext = txContext; + } + + @Override + public void run() { + var bytes = new byte[]{1, 2, 3, 4}; + try { + for (int i = 0; i < 100; i++) { + long uuid = iItemStorage.insertItem(txContext, bytes); + assertArrayEquals(bytes, iItemStorage.queryItemByUuid(uuid)); + } + } catch (Exception e) { + e.printStackTrace(); + fail("Exception caught"); + } + } + } + } diff --git a/src/test/java/net/kaaass/rumbase/recovery/IRecoveryTest.java b/src/test/java/net/kaaass/rumbase/recovery/IRecoveryTest.java index a94f553..9de183a 100644 --- a/src/test/java/net/kaaass/rumbase/recovery/IRecoveryTest.java +++ b/src/test/java/net/kaaass/rumbase/recovery/IRecoveryTest.java @@ -2,34 +2,196 @@ import junit.framework.TestCase; import lombok.extern.slf4j.Slf4j; +import net.kaaass.rumbase.FileUtil; +import net.kaaass.rumbase.dataitem.IItemStorage; +import net.kaaass.rumbase.dataitem.ItemManager; +import net.kaaass.rumbase.dataitem.exception.UUIDException; +import net.kaaass.rumbase.page.exception.FileException; +import net.kaaass.rumbase.page.exception.PageException; +import net.kaaass.rumbase.recovery.exception.LogException; +import net.kaaass.rumbase.transaction.TransactionContext; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; - -import static org.junit.Assert.assertArrayEquals; +import java.util.Random; /** - * TODO 文档 + * 对日志进行保存和恢复 */ @Slf4j -public class IRecoveryTest extends TestCase { - - public void testBegin() { - IRecoveryStorage iRecoveryStorage = RecoveryManager.recovery("user.db"); - List l = new ArrayList<>(); - int xid = 1; - l.add(3); - l.add(2); - iRecoveryStorage.begin(xid, l); - - var content = iRecoveryStorage.getContent(); - String beginStr = "begin " + xid; - String snapStr = "snap " + l.toString(); - List result = new ArrayList<>(); - result.add(beginStr.getBytes()); - result.add(snapStr.getBytes()); - assertEquals(result.size(), content.size()); - assertArrayEquals(result.get(0), content.get(0)); - assertArrayEquals(result.get(1), content.get(1)); +public class IRecoveryTest { + + @BeforeClass + public static void createDataFolder() { + FileUtil.prepare(); + } + + @AfterClass + public static void clearDataFolder() { + FileUtil.clear(); + } + + public static final String PATH = FileUtil.TEST_PATH; + + @Test + public void testInsert() throws PageException, LogException, FileException, IOException, UUIDException { + String fileName = PATH + "testInsert.db"; + IItemStorage iItemStorage = ItemManager.fromFile(fileName); + byte[] bytes = new byte[]{1, 2, 3, 4}; + var txContext = TransactionContext.empty(); + var recoveryStorage = iItemStorage.getRecoveryStorage(); + var xid = 1; + var xid2 = 0; + List snaps = new ArrayList<>(); + // 测试日志中存在,但是表中不存在进行恢复 + recoveryStorage.begin(xid, snaps); + long a = 1; + long uuid = (a << 32) + Math.abs(new Random().nextInt()); + recoveryStorage.insert(xid, uuid, bytes); + recoveryStorage.commit(xid); + + // 测试日志中存在,且数据中也存在,会不会重复插入 + + recoveryStorage.begin(xid2, snaps); + iItemStorage.insertItem(txContext, bytes); + recoveryStorage.commit(xid2); + + recoveryStorage.recovery(); + // 测试日志有额外的数据有没有被插入 + var result = iItemStorage.queryItemByUuid(uuid); + Assert.assertTrue(Arrays.equals(result, bytes)); + // 测试表中有的数据是否被重复插入 + var list = iItemStorage.listItemByPageId(1); + Assert.assertEquals(2, list.size()); + + } + + @Test + public void testInsertFail() throws PageException, LogException, FileException, IOException, UUIDException { + String fileName = PATH + "testInsertFailed.db"; + IItemStorage iItemStorage = ItemManager.fromFile(fileName); + byte[] bytes = new byte[]{1, 2, 3, 4}; + var txContext = TransactionContext.empty(); + var recoveryStorage = iItemStorage.getRecoveryStorage(); + var xid = 0; + List snaps = new ArrayList<>(); + recoveryStorage.begin(xid, snaps); + long uuid = iItemStorage.insertItem(txContext, bytes); + recoveryStorage.rollback(xid); + recoveryStorage.recovery(); + try { + var item = iItemStorage.queryItemByUuid(uuid); + Assert.assertFalse(Arrays.equals(bytes, item)); + } catch (Exception e) { + + } + + } + + @Test + public void testUpdate() throws PageException, LogException, FileException, IOException, UUIDException { + String fileName = PATH + "testUpdate.db"; + IItemStorage iItemStorage = ItemManager.fromFile(fileName); + byte[] bytes = new byte[]{1, 2, 3, 4}; + byte[] bytesUpdate = new byte[]{2, 3, 4, 5}; + var txContext = TransactionContext.empty(); + var recoveryStorage = iItemStorage.getRecoveryStorage(); + + int xid = 0; + List snaps = new ArrayList<>(); + + // 测试在表中存在的redo + recoveryStorage.begin(xid, snaps); + long uuid = iItemStorage.insertItem(txContext, bytes); + iItemStorage.updateItemByUuid(txContext, uuid, bytesUpdate); + recoveryStorage.commit(xid); + + // 测试在表中不存在的redo + int xid2 = 1; + recoveryStorage.begin(xid2, snaps); + long a = 1; + long uuid2 = (a << 32) + Math.abs(new Random().nextInt()); + recoveryStorage.insert(xid2, uuid2, bytes); + recoveryStorage.update(xid2, uuid2, bytes, bytesUpdate); + recoveryStorage.commit(xid2); + + // 测试abort的事务 + int xid3 = 2; + recoveryStorage.begin(xid3, snaps); + long b = 1; + long uuid3 = (b << 32) + Math.abs(new Random().nextInt()); + recoveryStorage.insert(xid3, uuid3, bytes); + recoveryStorage.commit(xid3); + int xid4 = 3; + recoveryStorage.begin(xid4, snaps); + recoveryStorage.update(xid4, uuid3, bytes, bytesUpdate); + recoveryStorage.rollback(xid4); + + recoveryStorage.recovery(); + + Assert.assertTrue(Arrays.equals(bytesUpdate, iItemStorage.queryItemByUuid(uuid))); + Assert.assertTrue(Arrays.equals(bytesUpdate, iItemStorage.queryItemByUuid(uuid2))); + Assert.assertTrue(Arrays.equals(bytes, iItemStorage.queryItemByUuid(uuid3))); + var list = iItemStorage.listItemByPageId(1); + Assert.assertEquals(3, list.size()); + } + + @Test + public void testUpdateMeta() throws PageException, LogException, FileException, IOException { + String fileName = PATH + "testUpdateMeta.db"; + IItemStorage iItemStorage = ItemManager.fromFile(fileName); + byte[] bytes = new byte[]{1, 2, 3, 4}; + byte[] bytesUpdate = new byte[]{2, 3, 4, 5}; + var txContext = TransactionContext.empty(); + var recoveryStorage = iItemStorage.getRecoveryStorage(); + + // 测试表中有 + int xid = 0; + List snaps = new ArrayList<>(); + recoveryStorage.begin(xid, snaps); + var uuid = iItemStorage.setMetadata(txContext, bytes); + recoveryStorage.commit(xid); + + Assert.assertTrue(Arrays.equals(bytes, iItemStorage.getMetadata())); + // 测试表中没有头信息时更新 + int xid2 = 1; + recoveryStorage.begin(xid2, snaps); + recoveryStorage.updateMeta(xid2, uuid, bytesUpdate); + recoveryStorage.commit(xid2); + recoveryStorage.recovery(); + Assert.assertTrue(Arrays.equals(bytesUpdate, iItemStorage.getMetadata())); + + } + + @Test + public void testUpdateMetaFail() throws PageException, LogException, FileException, IOException { + String fileName = PATH + "testUpdateMetaFail.db"; + IItemStorage iItemStorage = ItemManager.fromFile(fileName); + byte[] bytes = new byte[]{1, 2, 3, 4}; + byte[] bytesUpdate = new byte[]{2, 3, 4, 5}; + var txContext = TransactionContext.empty(); + var recoveryStorage = iItemStorage.getRecoveryStorage(); + + // 测试表中有 + int xid = 0; + List snaps = new ArrayList<>(); + recoveryStorage.begin(xid, snaps); + var uuid = iItemStorage.setMetadata(txContext, bytes); + recoveryStorage.commit(xid); + + Assert.assertTrue(Arrays.equals(bytes, iItemStorage.getMetadata())); + // 测试事务失败 + int xid2 = 1; + recoveryStorage.begin(xid2, snaps); + recoveryStorage.updateMeta(xid2, uuid, bytesUpdate); + recoveryStorage.rollback(xid2); + recoveryStorage.recovery(); + Assert.assertTrue(Arrays.equals(bytes, iItemStorage.getMetadata())); } }