Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 59 additions & 2 deletions src/main/java/com/clevertap/stormdb/StormDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
public class StormDB {

public static final int RESERVED_KEY_MARKER = 0xffffffff;
public static final int DELETED_KEY_MARKER = 0x80000000;

private static final String FILE_NAME_DATA = "data";
private static final String FILE_NAME_WAL = "wal";
Expand Down Expand Up @@ -294,6 +295,15 @@ private void buildIndexFromFile(boolean isWal) throws IOException {
try {
reader.readFromFile(wrapper, false, entry -> {
final int key = entry.getInt();

if (key == DELETED_KEY_MARKER) {
final int deletedKey = entry.getInt();
index.remove(deletedKey);
dataInWalFile.clear(deletedKey);
fileIndex[0]++;
return;
}

index.put(key, fileIndex[0]++);
if (isWal) {
dataInWalFile.set(key);
Expand Down Expand Up @@ -496,9 +506,10 @@ public void put(int key, byte[] value, int valueOffset) throws IOException {
+ "last compaction resulted in an exception!", exceptionDuringBackgroundOps);
}

if (key == RESERVED_KEY_MARKER) {
throw new ReservedKeyException(RESERVED_KEY_MARKER);
if (key == RESERVED_KEY_MARKER || key == DELETED_KEY_MARKER) {
throw new ReservedKeyException(key);
}

rwLock.writeLock().lock();

try {
Expand Down Expand Up @@ -546,6 +557,44 @@ public void put(int key, byte[] value, int valueOffset) throws IOException {
}
}


public void remove(int key) throws IOException {

if (exceptionDuringBackgroundOps != null) {
throw new StormDBRuntimeException("Will not accept any further writes since the "
+ "last compaction resulted in an exception!", exceptionDuringBackgroundOps);
}

if (key == RESERVED_KEY_MARKER || key == DELETED_KEY_MARKER) {
throw new ReservedKeyException(key);
}

final int recordIndexForKey = index.get(key);
if (recordIndexForKey == RESERVED_KEY_MARKER) {
return; // no deletion
}

rwLock.writeLock().lock();
try {

index.remove(key);

ByteBuffer deletedKeyBuffer = ByteBuffer.allocate(conf.getValueSize());
deletedKeyBuffer.putInt(key);

if (buffer.isFull()) {
flush();
synchronized (compactionSync) {
compactionSync.notifyAll();
}
}

buffer.add(DELETED_KEY_MARKER, deletedKeyBuffer.array(), 0);
} finally {
rwLock.writeLock().unlock();
}
}

public void flush() throws IOException {
rwLock.writeLock().lock();
try {
Expand Down Expand Up @@ -613,6 +662,14 @@ private void iterate(final boolean useLatestWalFile, final boolean readInMemoryB

final Consumer<ByteBuffer> entryConsumer = entry -> {
final int key = entry.getInt();

// deleted key logic
if (key == DELETED_KEY_MARKER) {
final int deletedKey = entry.getInt();
keysRead.set(deletedKey);
return;
}

final boolean b = keysRead.get(key);
if (!b) {
try {
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/clevertap/stormdb/maps/DefaultIndexMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ public int get(int key) {
public int size() {
return indexMap.size();
}

@Override
public int remove(int key) { return indexMap.remove(key); }
}
7 changes: 7 additions & 0 deletions src/main/java/com/clevertap/stormdb/maps/IndexMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,11 @@ public interface IndexMap {
* @return Size of the index.
*/
int size();

/**
* API to remove a particular key from the database return {@link StormDB#RESERVED_KEY_MARKER}
* @param key The key which needs to be removed
* @return the return value stored for that key
*/
int remove(int key);
}
3 changes: 3 additions & 0 deletions src/test/java/com/clevertap/stormdb/CustomIndexMapTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public int get(int key) {
public int size() {
return kvCache.size();
}

@Override
public int remove(int key) { return kvCache.remove(key); }
})
.build();

Expand Down
156 changes: 156 additions & 0 deletions src/test/java/com/clevertap/stormdb/StormDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -753,4 +753,160 @@ void testInMemoryUpdate() throws IOException, StormDBException {
});
assertEquals(3, db.size());
}

private byte[] getByteArrayForValueWithValueSize(int value, int valueSize) {
ByteBuffer valueBuffer = ByteBuffer.allocate(valueSize);
valueBuffer.putInt(value);
return valueBuffer.array();
}

@Test
void testKeysDeletionWithoutCompactionAndClose() throws IOException, StormDBException {
final int valueSize = 28;
final Path path = Files.createTempDirectory("stormdb");
StormDB db = buildDB(path, valueSize);
// insert some keys
int totalEntries = 500000;

for (int i = 1; i <= totalEntries; i++) {
db.put(i, getByteArrayForValueWithValueSize(i, valueSize));
}

assertEquals(totalEntries, db.size()); // db size should match

int[] total = {0};

db.iterate((key, data, offset) -> {
ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize);
assertEquals(key, dataValue.getInt());
total[0]++;
});

assertEquals(totalEntries, total[0]);

// simple deletion
for (int i = 1;i <= totalEntries; i++) {
if (i%2 == 0) {
db.remove(i);
}
}

assertNotNull(db.randomGet(1));
assertNull(db.randomGet(2));
assertEquals(totalEntries / 2, db.size()); // db size should become half
}

@Test
void testKeysDeletionWithDBClose() throws IOException, StormDBException, InterruptedException{
final int valueSize = 28;
final Path path = Files.createTempDirectory("stormdb");
StormDB db = buildDB(path, valueSize);

int totalEntries = 100;
for (int i = 1; i <= totalEntries; i++) {
db.put(i, getByteArrayForValueWithValueSize(i, valueSize));
}

assertEquals(totalEntries, db.size());

for (int i = 1; i <= totalEntries; i++) {
if (i % 3 == 0) {
db.remove(i);
}
}

db.close(); // closing db

// now the deleted keys should maintain their state

db = buildDB(path, valueSize);

assertEquals(totalEntries - totalEntries / 3, db.size());

int[] total = {0};
// checking by iterating
db.iterate((key, data, offset) -> {
ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize);
assertEquals(key, dataValue.getInt());
total[0]++;
});

assertEquals(totalEntries - totalEntries / 3, total[0]);

// checking with random get also
for (int i = 1; i <= totalEntries; i++) {
if (i % 3 == 0) {
assertNull(db.randomGet(i));
} else {
ByteBuffer dataValue = ByteBuffer.wrap(db.randomGet(i), 0, valueSize);
assertEquals(i, dataValue.getInt());
}
}
}

@Test
void testKeysDeletionCompact() throws IOException, StormDBException {
final int valueSize = 28;
final Path path = Files.createTempDirectory("stormdb");
StormDB db = buildDB(path, valueSize);

db.remove(1); // key = 1 doesn't exist

db.put(1, getByteArrayForValueWithValueSize(1, valueSize));
assertEquals(1, db.size());
assertArrayEquals(getByteArrayForValueWithValueSize(1, valueSize), db.randomGet(1));

db.remove(1);
assertEquals(0, db.size());
assertNull(db.randomGet(1));

db.compact();

int[] total = {0};
db.iterate((key, data, offset)->{
ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize);
assertEquals(key, dataValue.getInt());
total[0]++;
});
assertEquals(0, total[0]);

total[0] = 0;
db.put(1, getByteArrayForValueWithValueSize(1, valueSize));
db.iterate((key, data, offset)->{
ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize);
assertEquals(key, dataValue.getInt());
total[0]++;
});
assertEquals(1, total[0]);

total[0] = 0;
db.remove(1);
db.iterate((key, data, offset)->{
ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize);
assertEquals(key, dataValue.getInt());
total[0]++;
});
assertEquals(0, total[0]);
assertEquals(0, db.size());
}

@Test
void testDeletionExceptions() throws IOException {
final int valueSize = 28;
final Path path = Files.createTempDirectory("stormdb");
StormDB db = buildDB(path, valueSize);

assertThrows(ReservedKeyException.class, () -> {
db.remove(StormDB.DELETED_KEY_MARKER);
});

assertThrows(ReservedKeyException.class, () -> {
db.remove(StormDB.RESERVED_KEY_MARKER);
});

db.exceptionDuringBackgroundOps = new Exception("testing");
assertThrows(StormDBRuntimeException.class, () -> {
db.remove(2);
});
}
}