From 6a8ee9eae98a220e998a55510684b5d35c30a4cf Mon Sep 17 00:00:00 2001 From: Akshay Date: Fri, 30 Oct 2020 01:49:00 +0530 Subject: [PATCH 1/5] deletion logic added --- .../java/com/clevertap/stormdb/StormDB.java | 52 +++++++++++++++- .../stormdb/maps/DefaultIndexMap.java | 3 + .../com/clevertap/stormdb/maps/IndexMap.java | 7 +++ .../clevertap/stormdb/CustomIndexMapTest.java | 3 + .../com/clevertap/stormdb/StormDBTest.java | 60 +++++++++++++++++++ 5 files changed, 123 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/clevertap/stormdb/StormDB.java b/src/main/java/com/clevertap/stormdb/StormDB.java index 483e781..0544e93 100644 --- a/src/main/java/com/clevertap/stormdb/StormDB.java +++ b/src/main/java/com/clevertap/stormdb/StormDB.java @@ -46,6 +46,7 @@ public class StormDB { public static final int RESERVED_KEY_MARKER = 0xffffffff; + public static final int DELETED_KEY_MARKER = 0x80000000; private static final String FILE_NAME_DATA = "data"; private static final String FILE_NAME_WAL = "wal"; @@ -496,9 +497,10 @@ public void put(int key, byte[] value, int valueOffset) throws IOException { + "last compaction resulted in an exception!", exceptionDuringBackgroundOps); } - if (key == RESERVED_KEY_MARKER) { - throw new ReservedKeyException(RESERVED_KEY_MARKER); + if (key == RESERVED_KEY_MARKER || key == DELETED_KEY_MARKER) { + throw new ReservedKeyException(key); } + rwLock.writeLock().lock(); try { @@ -546,6 +548,40 @@ public void put(int key, byte[] value, int valueOffset) throws IOException { } } + + public void remove(int key) throws IOException { + + if (exceptionDuringBackgroundOps != null) { + throw new StormDBRuntimeException("Will not accept any further writes since the " + + "last compaction resulted in an exception!", exceptionDuringBackgroundOps); + } + + if (key == RESERVED_KEY_MARKER || key == DELETED_KEY_MARKER) { + throw new ReservedKeyException(key); + } + + rwLock.writeLock().lock(); + try { + + index.remove(key); + + ByteBuffer deletedKeyBuffer = ByteBuffer.allocate(recordSize); + deletedKeyBuffer.putInt(key); + + if (buffer.isFull()) { + flush(); + synchronized (compactionSync) { + compactionSync.notifyAll(); + } + } + + buffer.add(DELETED_KEY_MARKER, deletedKeyBuffer.array(), 0); + + } finally { + rwLock.writeLock().unlock(); + } + } + public void flush() throws IOException { rwLock.writeLock().lock(); try { @@ -613,6 +649,14 @@ private void iterate(final boolean useLatestWalFile, final boolean readInMemoryB final Consumer entryConsumer = entry -> { final int key = entry.getInt(); + + // deleted key logic + if (key == DELETED_KEY_MARKER) { + final int deletedKey = entry.getInt(); + keysRead.set(deletedKey); + return; + } + final boolean b = keysRead.get(key); if (!b) { try { @@ -758,4 +802,8 @@ public boolean isUsingExecutorService() { public int size() { return index.size(); } + + public static void main(String[] args) { + System.out.println(0x80000000); + } } diff --git a/src/main/java/com/clevertap/stormdb/maps/DefaultIndexMap.java b/src/main/java/com/clevertap/stormdb/maps/DefaultIndexMap.java index d108def..0e5d7a3 100644 --- a/src/main/java/com/clevertap/stormdb/maps/DefaultIndexMap.java +++ b/src/main/java/com/clevertap/stormdb/maps/DefaultIndexMap.java @@ -31,4 +31,7 @@ public int get(int key) { public int size() { return indexMap.size(); } + + @Override + public int remove(int key) { return indexMap.remove(key); } } diff --git a/src/main/java/com/clevertap/stormdb/maps/IndexMap.java b/src/main/java/com/clevertap/stormdb/maps/IndexMap.java index 038e45f..d4cbc18 100644 --- a/src/main/java/com/clevertap/stormdb/maps/IndexMap.java +++ b/src/main/java/com/clevertap/stormdb/maps/IndexMap.java @@ -26,4 +26,11 @@ public interface IndexMap { * @return Size of the index. */ int size(); + + /** + * API to remove a particular key from the database return {@link StormDB#RESERVED_KEY_MARKER} + * @param key The key which needs to be removed + * @return the return value stored for that key + */ + int remove(int key); } diff --git a/src/test/java/com/clevertap/stormdb/CustomIndexMapTest.java b/src/test/java/com/clevertap/stormdb/CustomIndexMapTest.java index 93ce5ba..ff2610a 100644 --- a/src/test/java/com/clevertap/stormdb/CustomIndexMapTest.java +++ b/src/test/java/com/clevertap/stormdb/CustomIndexMapTest.java @@ -43,6 +43,9 @@ public int get(int key) { public int size() { return kvCache.size(); } + + @Override + public int remove(int key) { return kvCache.remove(key); } }) .build(); diff --git a/src/test/java/com/clevertap/stormdb/StormDBTest.java b/src/test/java/com/clevertap/stormdb/StormDBTest.java index babd241..63e957c 100644 --- a/src/test/java/com/clevertap/stormdb/StormDBTest.java +++ b/src/test/java/com/clevertap/stormdb/StormDBTest.java @@ -753,4 +753,64 @@ void testInMemoryUpdate() throws IOException, StormDBException { }); assertEquals(3, db.size()); } + + @Test + void testKeysDeletion() throws IOException, StormDBException, InterruptedException{ + final int valueSize = 28; + final Path path = Files.createTempDirectory("stormdb"); + StormDB db = buildDB(path, valueSize); + // insert some keys + int totalEntries = 115; + ByteBuffer value = ByteBuffer.allocate(valueSize); + for (int i = 1; i <= totalEntries; i++) { + value.clear(); + value.putInt(i); + db.put(i, value.array()); + } + + int[] total = {0}; + + db.iterate((key, data, offset) -> { + ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize); + assertEquals(key, dataValue.getInt()); + total[0]++; + }); + + assertEquals(totalEntries, total[0]); + // now delete every even number + total[0] = 0; + + // simple deletion + for (int i = 1;i <= totalEntries; i++) { + if (i%2 == 0) { + db.remove(i); + } + if (i % 3 == 0) { + value.clear(); + value.putInt(i); + db.put(i, value.array()); + } + } + db.iterate((key, data, offset) -> { + ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize); + assertEquals(key, dataValue.getInt()); + total[0]++; + }); + assertEquals(77, total[0]); + + // compaction based deletion + db.compact(); + total[0] = 0; + + db.close(); + + db = buildDB(path, 28); + + db.iterate((key, data, offset) -> { + ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize); + assertEquals(key, dataValue.getInt()); + total[0]++; + }); + assertEquals(77, total[0]); + } } \ No newline at end of file From ef1c2d77b7982bd92912d3ddde4932c995bf3e32 Mon Sep 17 00:00:00 2001 From: Akshay Date: Fri, 30 Oct 2020 01:52:19 +0530 Subject: [PATCH 2/5] extra code removed --- src/main/java/com/clevertap/stormdb/StormDB.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/main/java/com/clevertap/stormdb/StormDB.java b/src/main/java/com/clevertap/stormdb/StormDB.java index 0544e93..fc446b1 100644 --- a/src/main/java/com/clevertap/stormdb/StormDB.java +++ b/src/main/java/com/clevertap/stormdb/StormDB.java @@ -802,8 +802,4 @@ public boolean isUsingExecutorService() { public int size() { return index.size(); } - - public static void main(String[] args) { - System.out.println(0x80000000); - } } From dc67acc653f97fd00b827f555ba4ec23800763ea Mon Sep 17 00:00:00 2001 From: Akshay Date: Fri, 30 Oct 2020 02:31:48 +0530 Subject: [PATCH 3/5] tests fixed for coverage --- .../java/com/clevertap/stormdb/StormDB.java | 3 +- .../com/clevertap/stormdb/StormDBTest.java | 58 ++++++++++++++----- 2 files changed, 43 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/clevertap/stormdb/StormDB.java b/src/main/java/com/clevertap/stormdb/StormDB.java index fc446b1..19ab163 100644 --- a/src/main/java/com/clevertap/stormdb/StormDB.java +++ b/src/main/java/com/clevertap/stormdb/StormDB.java @@ -565,7 +565,7 @@ public void remove(int key) throws IOException { index.remove(key); - ByteBuffer deletedKeyBuffer = ByteBuffer.allocate(recordSize); + ByteBuffer deletedKeyBuffer = ByteBuffer.allocate(conf.getValueSize()); deletedKeyBuffer.putInt(key); if (buffer.isFull()) { @@ -576,7 +576,6 @@ public void remove(int key) throws IOException { } buffer.add(DELETED_KEY_MARKER, deletedKeyBuffer.array(), 0); - } finally { rwLock.writeLock().unlock(); } diff --git a/src/test/java/com/clevertap/stormdb/StormDBTest.java b/src/test/java/com/clevertap/stormdb/StormDBTest.java index 63e957c..a3e74fb 100644 --- a/src/test/java/com/clevertap/stormdb/StormDBTest.java +++ b/src/test/java/com/clevertap/stormdb/StormDBTest.java @@ -755,12 +755,12 @@ void testInMemoryUpdate() throws IOException, StormDBException { } @Test - void testKeysDeletion() throws IOException, StormDBException, InterruptedException{ + void testKeysDeletionNormal() throws IOException, StormDBException, InterruptedException{ final int valueSize = 28; final Path path = Files.createTempDirectory("stormdb"); StormDB db = buildDB(path, valueSize); // insert some keys - int totalEntries = 115; + int totalEntries = 500000; ByteBuffer value = ByteBuffer.allocate(valueSize); for (int i = 1; i <= totalEntries; i++) { value.clear(); @@ -785,32 +785,58 @@ void testKeysDeletion() throws IOException, StormDBException, InterruptedExcepti if (i%2 == 0) { db.remove(i); } - if (i % 3 == 0) { - value.clear(); - value.putInt(i); - db.put(i, value.array()); - } } db.iterate((key, data, offset) -> { ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize); assertEquals(key, dataValue.getInt()); total[0]++; }); - assertEquals(77, total[0]); + assertEquals(totalEntries/2, total[0]); + } - // compaction based deletion - db.compact(); - total[0] = 0; + @Test + public void testKeysDeletionComplex() throws IOException, StormDBException { + final int valueSize = 28; + final Path path = Files.createTempDirectory("stormdb"); + StormDB db = buildDB(path, valueSize); - db.close(); + ByteBuffer value = ByteBuffer.allocate(valueSize); + value.putInt(1); + db.put(1, value.array()); + assertArrayEquals(value.array(), db.randomGet(1)); - db = buildDB(path, 28); + db.remove(1); + assertNull(db.randomGet(1)); - db.iterate((key, data, offset) -> { + db.put(1, value.array()); + assertArrayEquals(value.array(), db.randomGet(1)); + + int[] total = {0}; + db.iterate((key, data, offset)->{ ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize); - assertEquals(key, dataValue.getInt()); + assertEquals(key, dataValue.getInt()); total[0]++; }); - assertEquals(77, total[0]); + assertEquals(1, total[0]); + } + + @Test + public void testDeletionExceptions() throws IOException { + final int valueSize = 28; + final Path path = Files.createTempDirectory("stormdb"); + StormDB db = buildDB(path, valueSize); + + assertThrows(ReservedKeyException.class, () -> { + db.remove(StormDB.DELETED_KEY_MARKER); + }); + + assertThrows(ReservedKeyException.class, () -> { + db.remove(StormDB.RESERVED_KEY_MARKER); + }); + + assertThrows(StormDBRuntimeException.class, () -> { + db.exceptionDuringBackgroundOps = new Exception("testing"); + db.remove(2); + }); } } \ No newline at end of file From ae919f13cc73e894d15da3009f0b7164e87282fa Mon Sep 17 00:00:00 2001 From: Akshay Date: Fri, 30 Oct 2020 08:23:38 +0530 Subject: [PATCH 4/5] minor fixes --- .../java/com/clevertap/stormdb/StormDB.java | 9 ++++ .../com/clevertap/stormdb/StormDBTest.java | 53 +++++++++++++++---- 2 files changed, 53 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/clevertap/stormdb/StormDB.java b/src/main/java/com/clevertap/stormdb/StormDB.java index 19ab163..8de54d0 100644 --- a/src/main/java/com/clevertap/stormdb/StormDB.java +++ b/src/main/java/com/clevertap/stormdb/StormDB.java @@ -295,6 +295,15 @@ private void buildIndexFromFile(boolean isWal) throws IOException { try { reader.readFromFile(wrapper, false, entry -> { final int key = entry.getInt(); + + if (key == DELETED_KEY_MARKER) { + final int deletedKey = entry.getInt(); + index.remove(deletedKey); + dataInWalFile.clear(deletedKey); + fileIndex[0]++; + return; + } + index.put(key, fileIndex[0]++); if (isWal) { dataInWalFile.set(key); diff --git a/src/test/java/com/clevertap/stormdb/StormDBTest.java b/src/test/java/com/clevertap/stormdb/StormDBTest.java index a3e74fb..3dfe73c 100644 --- a/src/test/java/com/clevertap/stormdb/StormDBTest.java +++ b/src/test/java/com/clevertap/stormdb/StormDBTest.java @@ -768,6 +768,8 @@ void testKeysDeletionNormal() throws IOException, StormDBException, InterruptedE db.put(i, value.array()); } + assertEquals(totalEntries, db.size()); + int[] total = {0}; db.iterate((key, data, offset) -> { @@ -777,25 +779,48 @@ void testKeysDeletionNormal() throws IOException, StormDBException, InterruptedE }); assertEquals(totalEntries, total[0]); - // now delete every even number - total[0] = 0; // simple deletion for (int i = 1;i <= totalEntries; i++) { if (i%2 == 0) { db.remove(i); } + if (i % 1000 == 0) { + value.clear(); + value.putInt(i); + db.put(i, value.array()); + } } + + assertNotNull(db.randomGet(1)); + assertNull(db.randomGet(2)); + assertEquals(totalEntries / 2 + 500, db.size()); // checking db size + + db.close(); + + db = buildDB(path, valueSize); + total[0] = 0; + db.iterate((key, data, offset) -> { + if (key % 2 == 0) { + assertEquals(0, key % 1000); + } ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize); - assertEquals(key, dataValue.getInt()); + assertEquals(key, dataValue.getInt()); total[0]++; }); - assertEquals(totalEntries/2, total[0]); + + assertNull(db.randomGet(4)); + for (int i=1000;i < totalEntries; i+= 1000) { + value.clear(); + value.putInt(i); + assertArrayEquals(value.array(), db.randomGet(i)); + } + assertEquals(totalEntries / 2 + 500, total[0]); } @Test - public void testKeysDeletionComplex() throws IOException, StormDBException { + void testKeysDeletionComplex() throws IOException, StormDBException { final int valueSize = 28; final Path path = Files.createTempDirectory("stormdb"); StormDB db = buildDB(path, valueSize); @@ -803,13 +828,14 @@ public void testKeysDeletionComplex() throws IOException, StormDBException { ByteBuffer value = ByteBuffer.allocate(valueSize); value.putInt(1); db.put(1, value.array()); + assertEquals(1, db.size()); assertArrayEquals(value.array(), db.randomGet(1)); db.remove(1); + assertEquals(0, db.size()); assertNull(db.randomGet(1)); - db.put(1, value.array()); - assertArrayEquals(value.array(), db.randomGet(1)); + db.compact(); int[] total = {0}; db.iterate((key, data, offset)->{ @@ -817,11 +843,20 @@ public void testKeysDeletionComplex() throws IOException, StormDBException { assertEquals(key, dataValue.getInt()); total[0]++; }); + assertEquals(0, total[0]); + + total[0] = 0; + db.put(1, value.array()); + db.iterate((key, data, offset)->{ + ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize); + assertEquals(key, dataValue.getInt()); + total[0]++; + }); assertEquals(1, total[0]); } @Test - public void testDeletionExceptions() throws IOException { + void testDeletionExceptions() throws IOException { final int valueSize = 28; final Path path = Files.createTempDirectory("stormdb"); StormDB db = buildDB(path, valueSize); @@ -834,8 +869,8 @@ public void testDeletionExceptions() throws IOException { db.remove(StormDB.RESERVED_KEY_MARKER); }); + db.exceptionDuringBackgroundOps = new Exception("testing"); assertThrows(StormDBRuntimeException.class, () -> { - db.exceptionDuringBackgroundOps = new Exception("testing"); db.remove(2); }); } From 1da1da3f1ff31646a6146f86ac00aa31d146df32 Mon Sep 17 00:00:00 2001 From: Akshay Date: Fri, 30 Oct 2020 09:23:26 +0530 Subject: [PATCH 5/5] minor test fixes --- .../java/com/clevertap/stormdb/StormDB.java | 5 + .../com/clevertap/stormdb/StormDBTest.java | 95 +++++++++++++------ 2 files changed, 70 insertions(+), 30 deletions(-) diff --git a/src/main/java/com/clevertap/stormdb/StormDB.java b/src/main/java/com/clevertap/stormdb/StormDB.java index 8de54d0..18132e3 100644 --- a/src/main/java/com/clevertap/stormdb/StormDB.java +++ b/src/main/java/com/clevertap/stormdb/StormDB.java @@ -569,6 +569,11 @@ public void remove(int key) throws IOException { throw new ReservedKeyException(key); } + final int recordIndexForKey = index.get(key); + if (recordIndexForKey == RESERVED_KEY_MARKER) { + return; // no deletion + } + rwLock.writeLock().lock(); try { diff --git a/src/test/java/com/clevertap/stormdb/StormDBTest.java b/src/test/java/com/clevertap/stormdb/StormDBTest.java index 3dfe73c..d54cabe 100644 --- a/src/test/java/com/clevertap/stormdb/StormDBTest.java +++ b/src/test/java/com/clevertap/stormdb/StormDBTest.java @@ -754,21 +754,25 @@ void testInMemoryUpdate() throws IOException, StormDBException { assertEquals(3, db.size()); } + private byte[] getByteArrayForValueWithValueSize(int value, int valueSize) { + ByteBuffer valueBuffer = ByteBuffer.allocate(valueSize); + valueBuffer.putInt(value); + return valueBuffer.array(); + } + @Test - void testKeysDeletionNormal() throws IOException, StormDBException, InterruptedException{ + void testKeysDeletionWithoutCompactionAndClose() throws IOException, StormDBException { final int valueSize = 28; final Path path = Files.createTempDirectory("stormdb"); StormDB db = buildDB(path, valueSize); // insert some keys int totalEntries = 500000; - ByteBuffer value = ByteBuffer.allocate(valueSize); + for (int i = 1; i <= totalEntries; i++) { - value.clear(); - value.putInt(i); - db.put(i, value.array()); + db.put(i, getByteArrayForValueWithValueSize(i, valueSize)); } - assertEquals(totalEntries, db.size()); + assertEquals(totalEntries, db.size()); // db size should match int[] total = {0}; @@ -785,51 +789,72 @@ void testKeysDeletionNormal() throws IOException, StormDBException, InterruptedE if (i%2 == 0) { db.remove(i); } - if (i % 1000 == 0) { - value.clear(); - value.putInt(i); - db.put(i, value.array()); - } } assertNotNull(db.randomGet(1)); assertNull(db.randomGet(2)); - assertEquals(totalEntries / 2 + 500, db.size()); // checking db size + assertEquals(totalEntries / 2, db.size()); // db size should become half + } - db.close(); + @Test + void testKeysDeletionWithDBClose() throws IOException, StormDBException, InterruptedException{ + final int valueSize = 28; + final Path path = Files.createTempDirectory("stormdb"); + StormDB db = buildDB(path, valueSize); + + int totalEntries = 100; + for (int i = 1; i <= totalEntries; i++) { + db.put(i, getByteArrayForValueWithValueSize(i, valueSize)); + } + + assertEquals(totalEntries, db.size()); + + for (int i = 1; i <= totalEntries; i++) { + if (i % 3 == 0) { + db.remove(i); + } + } + + db.close(); // closing db + + // now the deleted keys should maintain their state db = buildDB(path, valueSize); - total[0] = 0; + assertEquals(totalEntries - totalEntries / 3, db.size()); + + int[] total = {0}; + // checking by iterating db.iterate((key, data, offset) -> { - if (key % 2 == 0) { - assertEquals(0, key % 1000); - } ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize); - assertEquals(key, dataValue.getInt()); + assertEquals(key, dataValue.getInt()); total[0]++; }); - assertNull(db.randomGet(4)); - for (int i=1000;i < totalEntries; i+= 1000) { - value.clear(); - value.putInt(i); - assertArrayEquals(value.array(), db.randomGet(i)); + assertEquals(totalEntries - totalEntries / 3, total[0]); + + // checking with random get also + for (int i = 1; i <= totalEntries; i++) { + if (i % 3 == 0) { + assertNull(db.randomGet(i)); + } else { + ByteBuffer dataValue = ByteBuffer.wrap(db.randomGet(i), 0, valueSize); + assertEquals(i, dataValue.getInt()); + } } - assertEquals(totalEntries / 2 + 500, total[0]); } @Test - void testKeysDeletionComplex() throws IOException, StormDBException { + void testKeysDeletionCompact() throws IOException, StormDBException { final int valueSize = 28; final Path path = Files.createTempDirectory("stormdb"); StormDB db = buildDB(path, valueSize); - ByteBuffer value = ByteBuffer.allocate(valueSize); - value.putInt(1); - db.put(1, value.array()); + db.remove(1); // key = 1 doesn't exist + + db.put(1, getByteArrayForValueWithValueSize(1, valueSize)); assertEquals(1, db.size()); - assertArrayEquals(value.array(), db.randomGet(1)); + assertArrayEquals(getByteArrayForValueWithValueSize(1, valueSize), db.randomGet(1)); db.remove(1); assertEquals(0, db.size()); @@ -846,13 +871,23 @@ void testKeysDeletionComplex() throws IOException, StormDBException { assertEquals(0, total[0]); total[0] = 0; - db.put(1, value.array()); + db.put(1, getByteArrayForValueWithValueSize(1, valueSize)); db.iterate((key, data, offset)->{ ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize); assertEquals(key, dataValue.getInt()); total[0]++; }); assertEquals(1, total[0]); + + total[0] = 0; + db.remove(1); + db.iterate((key, data, offset)->{ + ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize); + assertEquals(key, dataValue.getInt()); + total[0]++; + }); + assertEquals(0, total[0]); + assertEquals(0, db.size()); } @Test