diff --git a/src/main/java/com/clevertap/stormdb/StormDB.java b/src/main/java/com/clevertap/stormdb/StormDB.java index 483e781..18132e3 100644 --- a/src/main/java/com/clevertap/stormdb/StormDB.java +++ b/src/main/java/com/clevertap/stormdb/StormDB.java @@ -46,6 +46,7 @@ public class StormDB { public static final int RESERVED_KEY_MARKER = 0xffffffff; + public static final int DELETED_KEY_MARKER = 0x80000000; private static final String FILE_NAME_DATA = "data"; private static final String FILE_NAME_WAL = "wal"; @@ -294,6 +295,15 @@ private void buildIndexFromFile(boolean isWal) throws IOException { try { reader.readFromFile(wrapper, false, entry -> { final int key = entry.getInt(); + + if (key == DELETED_KEY_MARKER) { + final int deletedKey = entry.getInt(); + index.remove(deletedKey); + dataInWalFile.clear(deletedKey); + fileIndex[0]++; + return; + } + index.put(key, fileIndex[0]++); if (isWal) { dataInWalFile.set(key); @@ -496,9 +506,10 @@ public void put(int key, byte[] value, int valueOffset) throws IOException { + "last compaction resulted in an exception!", exceptionDuringBackgroundOps); } - if (key == RESERVED_KEY_MARKER) { - throw new ReservedKeyException(RESERVED_KEY_MARKER); + if (key == RESERVED_KEY_MARKER || key == DELETED_KEY_MARKER) { + throw new ReservedKeyException(key); } + rwLock.writeLock().lock(); try { @@ -546,6 +557,44 @@ public void put(int key, byte[] value, int valueOffset) throws IOException { } } + + public void remove(int key) throws IOException { + + if (exceptionDuringBackgroundOps != null) { + throw new StormDBRuntimeException("Will not accept any further writes since the " + + "last compaction resulted in an exception!", exceptionDuringBackgroundOps); + } + + if (key == RESERVED_KEY_MARKER || key == DELETED_KEY_MARKER) { + throw new ReservedKeyException(key); + } + + final int recordIndexForKey = index.get(key); + if (recordIndexForKey == RESERVED_KEY_MARKER) { + return; // no deletion + } + + rwLock.writeLock().lock(); + try { + + index.remove(key); + + ByteBuffer deletedKeyBuffer = ByteBuffer.allocate(conf.getValueSize()); + deletedKeyBuffer.putInt(key); + + if (buffer.isFull()) { + flush(); + synchronized (compactionSync) { + compactionSync.notifyAll(); + } + } + + buffer.add(DELETED_KEY_MARKER, deletedKeyBuffer.array(), 0); + } finally { + rwLock.writeLock().unlock(); + } + } + public void flush() throws IOException { rwLock.writeLock().lock(); try { @@ -613,6 +662,14 @@ private void iterate(final boolean useLatestWalFile, final boolean readInMemoryB final Consumer entryConsumer = entry -> { final int key = entry.getInt(); + + // deleted key logic + if (key == DELETED_KEY_MARKER) { + final int deletedKey = entry.getInt(); + keysRead.set(deletedKey); + return; + } + final boolean b = keysRead.get(key); if (!b) { try { diff --git a/src/main/java/com/clevertap/stormdb/maps/DefaultIndexMap.java b/src/main/java/com/clevertap/stormdb/maps/DefaultIndexMap.java index d108def..0e5d7a3 100644 --- a/src/main/java/com/clevertap/stormdb/maps/DefaultIndexMap.java +++ b/src/main/java/com/clevertap/stormdb/maps/DefaultIndexMap.java @@ -31,4 +31,7 @@ public int get(int key) { public int size() { return indexMap.size(); } + + @Override + public int remove(int key) { return indexMap.remove(key); } } diff --git a/src/main/java/com/clevertap/stormdb/maps/IndexMap.java b/src/main/java/com/clevertap/stormdb/maps/IndexMap.java index 038e45f..d4cbc18 100644 --- a/src/main/java/com/clevertap/stormdb/maps/IndexMap.java +++ b/src/main/java/com/clevertap/stormdb/maps/IndexMap.java @@ -26,4 +26,11 @@ public interface IndexMap { * @return Size of the index. */ int size(); + + /** + * API to remove a particular key from the database return {@link StormDB#RESERVED_KEY_MARKER} + * @param key The key which needs to be removed + * @return the return value stored for that key + */ + int remove(int key); } diff --git a/src/test/java/com/clevertap/stormdb/CustomIndexMapTest.java b/src/test/java/com/clevertap/stormdb/CustomIndexMapTest.java index 93ce5ba..ff2610a 100644 --- a/src/test/java/com/clevertap/stormdb/CustomIndexMapTest.java +++ b/src/test/java/com/clevertap/stormdb/CustomIndexMapTest.java @@ -43,6 +43,9 @@ public int get(int key) { public int size() { return kvCache.size(); } + + @Override + public int remove(int key) { return kvCache.remove(key); } }) .build(); diff --git a/src/test/java/com/clevertap/stormdb/StormDBTest.java b/src/test/java/com/clevertap/stormdb/StormDBTest.java index babd241..d54cabe 100644 --- a/src/test/java/com/clevertap/stormdb/StormDBTest.java +++ b/src/test/java/com/clevertap/stormdb/StormDBTest.java @@ -753,4 +753,160 @@ void testInMemoryUpdate() throws IOException, StormDBException { }); assertEquals(3, db.size()); } + + private byte[] getByteArrayForValueWithValueSize(int value, int valueSize) { + ByteBuffer valueBuffer = ByteBuffer.allocate(valueSize); + valueBuffer.putInt(value); + return valueBuffer.array(); + } + + @Test + void testKeysDeletionWithoutCompactionAndClose() throws IOException, StormDBException { + final int valueSize = 28; + final Path path = Files.createTempDirectory("stormdb"); + StormDB db = buildDB(path, valueSize); + // insert some keys + int totalEntries = 500000; + + for (int i = 1; i <= totalEntries; i++) { + db.put(i, getByteArrayForValueWithValueSize(i, valueSize)); + } + + assertEquals(totalEntries, db.size()); // db size should match + + int[] total = {0}; + + db.iterate((key, data, offset) -> { + ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize); + assertEquals(key, dataValue.getInt()); + total[0]++; + }); + + assertEquals(totalEntries, total[0]); + + // simple deletion + for (int i = 1;i <= totalEntries; i++) { + if (i%2 == 0) { + db.remove(i); + } + } + + assertNotNull(db.randomGet(1)); + assertNull(db.randomGet(2)); + assertEquals(totalEntries / 2, db.size()); // db size should become half + } + + @Test + void testKeysDeletionWithDBClose() throws IOException, StormDBException, InterruptedException{ + final int valueSize = 28; + final Path path = Files.createTempDirectory("stormdb"); + StormDB db = buildDB(path, valueSize); + + int totalEntries = 100; + for (int i = 1; i <= totalEntries; i++) { + db.put(i, getByteArrayForValueWithValueSize(i, valueSize)); + } + + assertEquals(totalEntries, db.size()); + + for (int i = 1; i <= totalEntries; i++) { + if (i % 3 == 0) { + db.remove(i); + } + } + + db.close(); // closing db + + // now the deleted keys should maintain their state + + db = buildDB(path, valueSize); + + assertEquals(totalEntries - totalEntries / 3, db.size()); + + int[] total = {0}; + // checking by iterating + db.iterate((key, data, offset) -> { + ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize); + assertEquals(key, dataValue.getInt()); + total[0]++; + }); + + assertEquals(totalEntries - totalEntries / 3, total[0]); + + // checking with random get also + for (int i = 1; i <= totalEntries; i++) { + if (i % 3 == 0) { + assertNull(db.randomGet(i)); + } else { + ByteBuffer dataValue = ByteBuffer.wrap(db.randomGet(i), 0, valueSize); + assertEquals(i, dataValue.getInt()); + } + } + } + + @Test + void testKeysDeletionCompact() throws IOException, StormDBException { + final int valueSize = 28; + final Path path = Files.createTempDirectory("stormdb"); + StormDB db = buildDB(path, valueSize); + + db.remove(1); // key = 1 doesn't exist + + db.put(1, getByteArrayForValueWithValueSize(1, valueSize)); + assertEquals(1, db.size()); + assertArrayEquals(getByteArrayForValueWithValueSize(1, valueSize), db.randomGet(1)); + + db.remove(1); + assertEquals(0, db.size()); + assertNull(db.randomGet(1)); + + db.compact(); + + int[] total = {0}; + db.iterate((key, data, offset)->{ + ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize); + assertEquals(key, dataValue.getInt()); + total[0]++; + }); + assertEquals(0, total[0]); + + total[0] = 0; + db.put(1, getByteArrayForValueWithValueSize(1, valueSize)); + db.iterate((key, data, offset)->{ + ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize); + assertEquals(key, dataValue.getInt()); + total[0]++; + }); + assertEquals(1, total[0]); + + total[0] = 0; + db.remove(1); + db.iterate((key, data, offset)->{ + ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize); + assertEquals(key, dataValue.getInt()); + total[0]++; + }); + assertEquals(0, total[0]); + assertEquals(0, db.size()); + } + + @Test + void testDeletionExceptions() throws IOException { + final int valueSize = 28; + final Path path = Files.createTempDirectory("stormdb"); + StormDB db = buildDB(path, valueSize); + + assertThrows(ReservedKeyException.class, () -> { + db.remove(StormDB.DELETED_KEY_MARKER); + }); + + assertThrows(ReservedKeyException.class, () -> { + db.remove(StormDB.RESERVED_KEY_MARKER); + }); + + db.exceptionDuringBackgroundOps = new Exception("testing"); + assertThrows(StormDBRuntimeException.class, () -> { + db.remove(2); + }); + } } \ No newline at end of file