Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 51 additions & 82 deletions src/main/java/com/clevertap/stormdb/Buffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static com.clevertap.stormdb.Config.KEY_SIZE;
import static com.clevertap.stormdb.Config.RECORDS_PER_BLOCK;

import com.clevertap.stormdb.exceptions.BufferFullException;
import com.clevertap.stormdb.exceptions.ReadOnlyBufferException;
import com.clevertap.stormdb.exceptions.StormDBRuntimeException;
import com.clevertap.stormdb.exceptions.ValueSizeTooLargeException;
Expand All @@ -18,6 +19,7 @@
import java.util.List;
import java.util.function.Consumer;
import java.util.zip.CRC32;
import java.util.ArrayList;

/**
* The {@link Buffer} is a logical extension of the WAL file. For a random get, if the index points
Expand All @@ -27,11 +29,9 @@
public class Buffer {

private ByteBuffer byteBuffer;
private final int valueSize;
private final int recordSize;
private final boolean readOnly;
private final Config dbConfig;
private final int maxRecords;


/**
* Initialises a write buffer for the WAL file with the following specification:
Expand All @@ -48,41 +48,19 @@ public class Buffer {
* @param readOnly Whether buffer is read only.
*/
Buffer(final Config dbConfig, final boolean readOnly) {
this.valueSize = dbConfig.getValueSize();
this.recordSize = valueSize + KEY_SIZE;
this.readOnly = readOnly;
this.dbConfig = dbConfig;
if (valueSize > Config.MAX_VALUE_SIZE) {
throw new ValueSizeTooLargeException();
}

this.maxRecords = calculateMaxRecords();

final int blocks = this.maxRecords / RECORDS_PER_BLOCK;

// Each block will have 1 CRC and 1 sync marker (the sync marker is one kv pair)
final int writeBufferSize = blocks * RECORDS_PER_BLOCK * recordSize
+ (blocks * (CRC_SIZE + recordSize));

// For variable-length, just use max buffer size directly
final int writeBufferSize = dbConfig.getMaxBufferSize();
byteBuffer = ByteBuffer.allocate(writeBufferSize);
}


int capacity() {
return byteBuffer.capacity();
}

int calculateMaxRecords() {
int recordsToBuffer = Math.max(dbConfig.getMaxBufferSize() / recordSize, RECORDS_PER_BLOCK);

// Get to the nearest multiple of 128.
recordsToBuffer = (recordsToBuffer / RECORDS_PER_BLOCK) * RECORDS_PER_BLOCK;
return recordsToBuffer;
}

public int getMaxRecords() {
return maxRecords;
}

int getWriteBufferSize() {
return byteBuffer.capacity();
}
Expand All @@ -92,21 +70,17 @@ int flush(final OutputStream out) throws IOException {
throw new ReadOnlyBufferException("Initialised in read only mode!");
}

// If buffer is empty, nothing to flush
if (byteBuffer.position() == 0) {
return 0;
}

// Fill the block with the last record, if required.
while ((RecordUtil.addressToIndex(recordSize, byteBuffer.position()))
% RECORDS_PER_BLOCK != 0) {
final int key = byteBuffer.getInt(byteBuffer.position() - recordSize);
add(key, byteBuffer.array(), byteBuffer.position() - recordSize + KEY_SIZE);
}

final int bytes = byteBuffer.position();
out.write(byteBuffer.array(), 0, bytes);
// Write all buffer contents directly to output stream
final int bytesToWrite = byteBuffer.position();
out.write(byteBuffer.array(), 0, bytesToWrite);
out.flush();
return bytes;

return bytesToWrite;
}

void readFromFiles(List<RandomAccessFile> files,
Expand All @@ -115,32 +89,40 @@ void readFromFiles(List<RandomAccessFile> files,
readFromFile(file, reverse, recordConsumer);
}
}

/**
* Read variable-length records from a file, supporting both forward and backward iteration.
* No longer depends on fixed block sizes - works directly with variable-length records.
*/
void readFromFile(final RandomAccessFile file, final boolean reverse,
final Consumer<ByteBuffer> recordConsumer)
throws IOException {
final int blockSize = RecordUtil.blockSizeWithTrailer(recordSize);
final Consumer<ByteBuffer> recordConsumer) throws IOException {

if (reverse) {
if (file.getFilePointer() % blockSize != 0) {
throw new StormDBRuntimeException("Inconsistent data for iteration!");
}

// This is needed for compaction: newer records (at end) should overwrite older ones
while (file.getFilePointer() != 0) {
byteBuffer.clear();
final long validBytesRemaining = file.getFilePointer() - byteBuffer.capacity();
file.seek(Math.max(validBytesRemaining, 0));

final long currentPosition = file.getFilePointer();
final long bytesToRead = Math.min(currentPosition, byteBuffer.capacity());
final long seekPosition = currentPosition - bytesToRead;

// Seek to the start position for this chunk
file.seek(seekPosition);

// Read the chunk and process variable-length records
fillBuffer(file, recordConsumer, true);

// Set the position again, since the read op moved the cursor ahead.
file.seek(Math.max(validBytesRemaining, 0));
// Move file pointer back for next iteration
file.seek(seekPosition);
}
} else {
// Read file forwards until end
// This is used for normal iteration and recovery
while (true) {
byteBuffer.clear();
final int bytesRead = fillBuffer(file, recordConsumer, false);
if (bytesRead < blockSize) {

// Stop when we reach end of file (no more data to read)
if (bytesRead == 0) {
break;
}
}
Expand Down Expand Up @@ -179,28 +161,33 @@ boolean isFull() {
return byteBuffer.remaining() == 0; // Perfect alignment, so this works.
}

int add(int key, byte[] value, int valueOffset) {
int add(long key, byte[] value, int valueOffset, int valueLength) {
if (readOnly) {
throw new ReadOnlyBufferException("Initialised in read only mode!");
}

if (byteBuffer.position() % RecordUtil.blockSizeWithTrailer(recordSize) == 0) {
insertSyncMarker();
Config.validateValueLength(valueLength);

// Check if we have space for this record
int recordSize = Config.calculateRecordSize(valueLength);
if (byteBuffer.remaining() < recordSize) {
throw new BufferFullException(recordSize, byteBuffer.remaining());
}

final int address = byteBuffer.position();

byteBuffer.putInt(key);
byteBuffer.put(value, valueOffset, valueSize);
// Write variable-length record: [Key:8][Length:4][Value:variable][Length:4]
byteBuffer.putLong(key); // Key (8 bytes)
byteBuffer.putInt(valueLength); // Length header (4 bytes)
byteBuffer.put(value, valueOffset, valueLength); // Value (variable bytes)
byteBuffer.putInt(valueLength); // Length footer (4 bytes)

// Should we close this block?
// Don't close the block if the we're adding the sync marker kv pair.
final int nextRecordIndex = RecordUtil.addressToIndex(recordSize, byteBuffer.position());
if (nextRecordIndex % RECORDS_PER_BLOCK == 0) {
closeBlock();
}
return address;
}
int add(long key, byte[] value, int valueOffset) {
return add(key, value, valueOffset, value.length - valueOffset);
}


/**
* Attempts to update a key in the in-memory buffer after verifying the key.
Expand All @@ -211,8 +198,8 @@ int add(int key, byte[] value, int valueOffset) {
* @param addressInBuffer The address in the buffer at which the key value pair exists
* @return true if the update succeeds after key verification, false otherwise
*/
boolean update(int key, byte[] newValue, int valueOffset, int addressInBuffer) {
int savedKey = byteBuffer.getInt(addressInBuffer);
boolean update(long key, byte[] newValue, int valueOffset, int addressInBuffer) {
long savedKey = byteBuffer.getLong(addressInBuffer);
if (savedKey != key) {
return false;
}
Expand Down Expand Up @@ -260,24 +247,6 @@ public ByteBuffer nextElement() {
};
}

private void closeBlock() {
final CRC32 crc32 = new CRC32();
final int blockSize = recordSize * RECORDS_PER_BLOCK;
crc32.update(byteBuffer.array(), byteBuffer.position() - blockSize, blockSize);
byteBuffer.putInt((int) crc32.getValue());
}

static byte[] getSyncMarker(final int valueSize) {
final ByteBuffer syncMarker = ByteBuffer.allocate(valueSize + KEY_SIZE);
Arrays.fill(syncMarker.array(), (byte) 0xFF);
syncMarker.putInt(RESERVED_KEY_MARKER); // This will override the first four bytes.
return syncMarker.array();
}

protected void insertSyncMarker() {
byteBuffer.put(getSyncMarker(valueSize));
}

void clear() {
byteBuffer = ByteBuffer.allocate(byteBuffer.capacity());
}
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/com/clevertap/stormdb/CompactionState.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.clevertap.stormdb;

import com.clevertap.stormdb.utils.BitSetLong;

import java.io.File;
import java.util.BitSet;

Expand All @@ -9,8 +11,8 @@ class CompactionState {

long nextFileRecordIndex;

BitSet dataInNextFile = new BitSet();
BitSet dataInNextWalFile = new BitSet();
BitSetLong dataInNextFile = new BitSetLong(); // REVIEW : we can change these to some Set<Long> or any other set that accpets long ?
BitSetLong dataInNextWalFile = new BitSetLong();

File nextWalFile;
File nextDataFile;
Expand Down
45 changes: 38 additions & 7 deletions src/main/java/com/clevertap/stormdb/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,16 @@ public class Config {
public static final int CRC_SIZE = 4; // CRC32. Not configurable for now.

// Key size
static final int KEY_SIZE = 4; // Not Configurable for now.
static final int KEY_SIZE = 8; // Not Configurable for now.

// Value meta data size
public static final int VALUE_LENGTH_SIZE = 4;

public static final int RECORD_HEADER_SIZE = KEY_SIZE + VALUE_LENGTH_SIZE; // 12 bytes
public static final int RECORD_FOOTER_SIZE = VALUE_LENGTH_SIZE; // 4 bytes

public static final int MAX_VALUE_SIZE = 8192; // (was 512*1024 as MAX_VALUE_SIZE)
public static final int MIN_VALUE_SIZE = 1; // 1 byte minimum

// Compaction parameter defaults
private static final long DEFAULT_COMPACTION_WAIT_TIMEOUT_MS =
Expand All @@ -32,7 +41,7 @@ public class Config {
* <p>
* Note: The hard limit is due to the fact that {@link ByteBuffer} accepts an int as its size.
*/
static final int MAX_VALUE_SIZE = 512 * 1024; // Not configurable for now.
// static final int MAX_VALUE_SIZE = 512 * 1024; // Not configurable for now.

// File open fd parameter defaults and range
private static final int DEFAULT_OPEN_FD_COUNT = 10;
Expand All @@ -41,7 +50,6 @@ public class Config {

// Must have parameters
boolean autoCompact = true;
int valueSize;
String dbDir;

// Other parameters
Expand All @@ -57,10 +65,6 @@ public boolean autoCompactEnabled() {
return autoCompact;
}

public int getValueSize() {
return valueSize;
}

public String getDbDir() {
return dbDir;
}
Expand Down Expand Up @@ -99,4 +103,31 @@ public int getOpenFDCount() {

public IndexMap getIndexMap() { return indexMap; }

public static int calculateRecordSize(int valueLength) {
validateValueLength(valueLength);
return RECORD_HEADER_SIZE + valueLength + RECORD_FOOTER_SIZE;
}

public int getValueSize() {
throw new UnsupportedOperationException(
"getValueSize() not supported in variable-length mode. Use getMaxValueSize() or calculate per value.");
}

public static int getMaxNodeSize() {
return calculateRecordSize(MAX_VALUE_SIZE);
}

public static int getMinNodeSize() {
return calculateRecordSize(MIN_VALUE_SIZE);
}

public static void validateValueLength(int valueLength) {
if (valueLength < MIN_VALUE_SIZE || valueLength > MAX_VALUE_SIZE) {
throw new IllegalArgumentException(
"Value length " + valueLength + " outside valid range [" +
MIN_VALUE_SIZE + ", " + MAX_VALUE_SIZE + "]");
}
}


}
2 changes: 1 addition & 1 deletion src/main/java/com/clevertap/stormdb/EntryConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@
*/
public interface EntryConsumer {

void accept(final int key, final byte[] data, final int offset) throws IOException;
void accept(final long key, final byte[] data, final int offset) throws IOException;
}
Loading
Loading