diff --git a/src/main/java/com/clevertap/stormdb/Buffer.java b/src/main/java/com/clevertap/stormdb/Buffer.java index 8ddc038..636f65e 100644 --- a/src/main/java/com/clevertap/stormdb/Buffer.java +++ b/src/main/java/com/clevertap/stormdb/Buffer.java @@ -5,6 +5,7 @@ import static com.clevertap.stormdb.Config.KEY_SIZE; import static com.clevertap.stormdb.Config.RECORDS_PER_BLOCK; +import com.clevertap.stormdb.exceptions.BufferFullException; import com.clevertap.stormdb.exceptions.ReadOnlyBufferException; import com.clevertap.stormdb.exceptions.StormDBRuntimeException; import com.clevertap.stormdb.exceptions.ValueSizeTooLargeException; @@ -18,6 +19,7 @@ import java.util.List; import java.util.function.Consumer; import java.util.zip.CRC32; +import java.util.ArrayList; /** * The {@link Buffer} is a logical extension of the WAL file. For a random get, if the index points @@ -27,11 +29,9 @@ public class Buffer { private ByteBuffer byteBuffer; - private final int valueSize; - private final int recordSize; private final boolean readOnly; private final Config dbConfig; - private final int maxRecords; + /** * Initialises a write buffer for the WAL file with the following specification: @@ -48,41 +48,19 @@ public class Buffer { * @param readOnly Whether buffer is read only. */ Buffer(final Config dbConfig, final boolean readOnly) { - this.valueSize = dbConfig.getValueSize(); - this.recordSize = valueSize + KEY_SIZE; this.readOnly = readOnly; this.dbConfig = dbConfig; - if (valueSize > Config.MAX_VALUE_SIZE) { - throw new ValueSizeTooLargeException(); - } - - this.maxRecords = calculateMaxRecords(); - - final int blocks = this.maxRecords / RECORDS_PER_BLOCK; - - // Each block will have 1 CRC and 1 sync marker (the sync marker is one kv pair) - final int writeBufferSize = blocks * RECORDS_PER_BLOCK * recordSize - + (blocks * (CRC_SIZE + recordSize)); + // For variable-length, just use max buffer size directly + final int writeBufferSize = dbConfig.getMaxBufferSize(); byteBuffer = ByteBuffer.allocate(writeBufferSize); } + int capacity() { return byteBuffer.capacity(); } - int calculateMaxRecords() { - int recordsToBuffer = Math.max(dbConfig.getMaxBufferSize() / recordSize, RECORDS_PER_BLOCK); - - // Get to the nearest multiple of 128. - recordsToBuffer = (recordsToBuffer / RECORDS_PER_BLOCK) * RECORDS_PER_BLOCK; - return recordsToBuffer; - } - - public int getMaxRecords() { - return maxRecords; - } - int getWriteBufferSize() { return byteBuffer.capacity(); } @@ -92,21 +70,17 @@ int flush(final OutputStream out) throws IOException { throw new ReadOnlyBufferException("Initialised in read only mode!"); } + // If buffer is empty, nothing to flush if (byteBuffer.position() == 0) { return 0; } - // Fill the block with the last record, if required. - while ((RecordUtil.addressToIndex(recordSize, byteBuffer.position())) - % RECORDS_PER_BLOCK != 0) { - final int key = byteBuffer.getInt(byteBuffer.position() - recordSize); - add(key, byteBuffer.array(), byteBuffer.position() - recordSize + KEY_SIZE); - } - - final int bytes = byteBuffer.position(); - out.write(byteBuffer.array(), 0, bytes); + // Write all buffer contents directly to output stream + final int bytesToWrite = byteBuffer.position(); + out.write(byteBuffer.array(), 0, bytesToWrite); out.flush(); - return bytes; + + return bytesToWrite; } void readFromFiles(List files, @@ -115,32 +89,40 @@ void readFromFiles(List files, readFromFile(file, reverse, recordConsumer); } } - + /** + * Read variable-length records from a file, supporting both forward and backward iteration. + * No longer depends on fixed block sizes - works directly with variable-length records. + */ void readFromFile(final RandomAccessFile file, final boolean reverse, - final Consumer recordConsumer) - throws IOException { - final int blockSize = RecordUtil.blockSizeWithTrailer(recordSize); + final Consumer recordConsumer) throws IOException { if (reverse) { - if (file.getFilePointer() % blockSize != 0) { - throw new StormDBRuntimeException("Inconsistent data for iteration!"); - } - + // This is needed for compaction: newer records (at end) should overwrite older ones while (file.getFilePointer() != 0) { byteBuffer.clear(); - final long validBytesRemaining = file.getFilePointer() - byteBuffer.capacity(); - file.seek(Math.max(validBytesRemaining, 0)); + final long currentPosition = file.getFilePointer(); + final long bytesToRead = Math.min(currentPosition, byteBuffer.capacity()); + final long seekPosition = currentPosition - bytesToRead; + + // Seek to the start position for this chunk + file.seek(seekPosition); + + // Read the chunk and process variable-length records fillBuffer(file, recordConsumer, true); - // Set the position again, since the read op moved the cursor ahead. - file.seek(Math.max(validBytesRemaining, 0)); + // Move file pointer back for next iteration + file.seek(seekPosition); } } else { + // Read file forwards until end + // This is used for normal iteration and recovery while (true) { byteBuffer.clear(); final int bytesRead = fillBuffer(file, recordConsumer, false); - if (bytesRead < blockSize) { + + // Stop when we reach end of file (no more data to read) + if (bytesRead == 0) { break; } } @@ -179,28 +161,33 @@ boolean isFull() { return byteBuffer.remaining() == 0; // Perfect alignment, so this works. } - int add(int key, byte[] value, int valueOffset) { + int add(long key, byte[] value, int valueOffset, int valueLength) { if (readOnly) { throw new ReadOnlyBufferException("Initialised in read only mode!"); } - if (byteBuffer.position() % RecordUtil.blockSizeWithTrailer(recordSize) == 0) { - insertSyncMarker(); + Config.validateValueLength(valueLength); + + // Check if we have space for this record + int recordSize = Config.calculateRecordSize(valueLength); + if (byteBuffer.remaining() < recordSize) { + throw new BufferFullException(recordSize, byteBuffer.remaining()); } final int address = byteBuffer.position(); - byteBuffer.putInt(key); - byteBuffer.put(value, valueOffset, valueSize); + // Write variable-length record: [Key:8][Length:4][Value:variable][Length:4] + byteBuffer.putLong(key); // Key (8 bytes) + byteBuffer.putInt(valueLength); // Length header (4 bytes) + byteBuffer.put(value, valueOffset, valueLength); // Value (variable bytes) + byteBuffer.putInt(valueLength); // Length footer (4 bytes) - // Should we close this block? - // Don't close the block if the we're adding the sync marker kv pair. - final int nextRecordIndex = RecordUtil.addressToIndex(recordSize, byteBuffer.position()); - if (nextRecordIndex % RECORDS_PER_BLOCK == 0) { - closeBlock(); - } return address; } + int add(long key, byte[] value, int valueOffset) { + return add(key, value, valueOffset, value.length - valueOffset); + } + /** * Attempts to update a key in the in-memory buffer after verifying the key. @@ -211,8 +198,8 @@ int add(int key, byte[] value, int valueOffset) { * @param addressInBuffer The address in the buffer at which the key value pair exists * @return true if the update succeeds after key verification, false otherwise */ - boolean update(int key, byte[] newValue, int valueOffset, int addressInBuffer) { - int savedKey = byteBuffer.getInt(addressInBuffer); + boolean update(long key, byte[] newValue, int valueOffset, int addressInBuffer) { + long savedKey = byteBuffer.getLong(addressInBuffer); if (savedKey != key) { return false; } @@ -260,24 +247,6 @@ public ByteBuffer nextElement() { }; } - private void closeBlock() { - final CRC32 crc32 = new CRC32(); - final int blockSize = recordSize * RECORDS_PER_BLOCK; - crc32.update(byteBuffer.array(), byteBuffer.position() - blockSize, blockSize); - byteBuffer.putInt((int) crc32.getValue()); - } - - static byte[] getSyncMarker(final int valueSize) { - final ByteBuffer syncMarker = ByteBuffer.allocate(valueSize + KEY_SIZE); - Arrays.fill(syncMarker.array(), (byte) 0xFF); - syncMarker.putInt(RESERVED_KEY_MARKER); // This will override the first four bytes. - return syncMarker.array(); - } - - protected void insertSyncMarker() { - byteBuffer.put(getSyncMarker(valueSize)); - } - void clear() { byteBuffer = ByteBuffer.allocate(byteBuffer.capacity()); } diff --git a/src/main/java/com/clevertap/stormdb/CompactionState.java b/src/main/java/com/clevertap/stormdb/CompactionState.java index 74f873b..90397c7 100644 --- a/src/main/java/com/clevertap/stormdb/CompactionState.java +++ b/src/main/java/com/clevertap/stormdb/CompactionState.java @@ -1,5 +1,7 @@ package com.clevertap.stormdb; +import com.clevertap.stormdb.utils.BitSetLong; + import java.io.File; import java.util.BitSet; @@ -9,8 +11,8 @@ class CompactionState { long nextFileRecordIndex; - BitSet dataInNextFile = new BitSet(); - BitSet dataInNextWalFile = new BitSet(); + BitSetLong dataInNextFile = new BitSetLong(); // REVIEW : we can change these to some Set or any other set that accpets long ? + BitSetLong dataInNextWalFile = new BitSetLong(); File nextWalFile; File nextDataFile; diff --git a/src/main/java/com/clevertap/stormdb/Config.java b/src/main/java/com/clevertap/stormdb/Config.java index 79a9316..8e58641 100644 --- a/src/main/java/com/clevertap/stormdb/Config.java +++ b/src/main/java/com/clevertap/stormdb/Config.java @@ -10,7 +10,16 @@ public class Config { public static final int CRC_SIZE = 4; // CRC32. Not configurable for now. // Key size - static final int KEY_SIZE = 4; // Not Configurable for now. + static final int KEY_SIZE = 8; // Not Configurable for now. + + // Value meta data size + public static final int VALUE_LENGTH_SIZE = 4; + + public static final int RECORD_HEADER_SIZE = KEY_SIZE + VALUE_LENGTH_SIZE; // 12 bytes + public static final int RECORD_FOOTER_SIZE = VALUE_LENGTH_SIZE; // 4 bytes + + public static final int MAX_VALUE_SIZE = 8192; // (was 512*1024 as MAX_VALUE_SIZE) + public static final int MIN_VALUE_SIZE = 1; // 1 byte minimum // Compaction parameter defaults private static final long DEFAULT_COMPACTION_WAIT_TIMEOUT_MS = @@ -32,7 +41,7 @@ public class Config { *

* Note: The hard limit is due to the fact that {@link ByteBuffer} accepts an int as its size. */ - static final int MAX_VALUE_SIZE = 512 * 1024; // Not configurable for now. +// static final int MAX_VALUE_SIZE = 512 * 1024; // Not configurable for now. // File open fd parameter defaults and range private static final int DEFAULT_OPEN_FD_COUNT = 10; @@ -41,7 +50,6 @@ public class Config { // Must have parameters boolean autoCompact = true; - int valueSize; String dbDir; // Other parameters @@ -57,10 +65,6 @@ public boolean autoCompactEnabled() { return autoCompact; } - public int getValueSize() { - return valueSize; - } - public String getDbDir() { return dbDir; } @@ -99,4 +103,31 @@ public int getOpenFDCount() { public IndexMap getIndexMap() { return indexMap; } + public static int calculateRecordSize(int valueLength) { + validateValueLength(valueLength); + return RECORD_HEADER_SIZE + valueLength + RECORD_FOOTER_SIZE; + } + + public int getValueSize() { + throw new UnsupportedOperationException( + "getValueSize() not supported in variable-length mode. Use getMaxValueSize() or calculate per value."); + } + + public static int getMaxNodeSize() { + return calculateRecordSize(MAX_VALUE_SIZE); + } + + public static int getMinNodeSize() { + return calculateRecordSize(MIN_VALUE_SIZE); + } + + public static void validateValueLength(int valueLength) { + if (valueLength < MIN_VALUE_SIZE || valueLength > MAX_VALUE_SIZE) { + throw new IllegalArgumentException( + "Value length " + valueLength + " outside valid range [" + + MIN_VALUE_SIZE + ", " + MAX_VALUE_SIZE + "]"); + } + } + + } diff --git a/src/main/java/com/clevertap/stormdb/EntryConsumer.java b/src/main/java/com/clevertap/stormdb/EntryConsumer.java index 380c186..8f08be2 100644 --- a/src/main/java/com/clevertap/stormdb/EntryConsumer.java +++ b/src/main/java/com/clevertap/stormdb/EntryConsumer.java @@ -7,5 +7,5 @@ */ public interface EntryConsumer { - void accept(final int key, final byte[] data, final int offset) throws IOException; + void accept(final long key, final byte[] data, final int offset) throws IOException; } diff --git a/src/main/java/com/clevertap/stormdb/StormDB.java b/src/main/java/com/clevertap/stormdb/StormDB.java index 483e781..859dfe7 100644 --- a/src/main/java/com/clevertap/stormdb/StormDB.java +++ b/src/main/java/com/clevertap/stormdb/StormDB.java @@ -1,13 +1,10 @@ package com.clevertap.stormdb; -import com.clevertap.stormdb.exceptions.InconsistentDataException; -import com.clevertap.stormdb.exceptions.IncorrectConfigException; -import com.clevertap.stormdb.exceptions.ReservedKeyException; -import com.clevertap.stormdb.exceptions.StormDBException; -import com.clevertap.stormdb.exceptions.StormDBRuntimeException; +import com.clevertap.stormdb.exceptions.*; import com.clevertap.stormdb.maps.DefaultIndexMap; import com.clevertap.stormdb.maps.IndexMap; import com.clevertap.stormdb.internal.RandomAccessFilePool; +import com.clevertap.stormdb.utils.BitSetLong; import com.clevertap.stormdb.utils.ByteUtil; import com.clevertap.stormdb.utils.RecordUtil; import java.io.BufferedOutputStream; @@ -22,10 +19,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.Enumeration; -import java.util.List; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -45,7 +39,7 @@ */ public class StormDB { - public static final int RESERVED_KEY_MARKER = 0xffffffff; + public static final long RESERVED_KEY_MARKER = 0xffffffffffffffffL; private static final String FILE_NAME_DATA = "data"; private static final String FILE_NAME_WAL = "wal"; @@ -58,7 +52,7 @@ public class StormDB { */ private final IndexMap index; - private BitSet dataInWalFile = new BitSet(); + private BitSetLong dataInWalFile = new BitSetLong(); private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); @@ -67,8 +61,6 @@ public class StormDB { private final Buffer buffer; private long lastBufferFlushTimeMs; - private final int recordSize; - private long bytesInWalFile = -1; // Will be initialised on the first write. private final File dbDirFile; private final Config conf; @@ -107,8 +99,6 @@ public class StormDB { index = conf.getIndexMap(); } - recordSize = conf.getValueSize() + Config.KEY_SIZE; - buffer = new Buffer(conf, false); lastBufferFlushTimeMs = System.currentTimeMillis(); @@ -293,7 +283,7 @@ private void buildIndexFromFile(boolean isWal) throws IOException { // A small price to pay for not needing bitsets. try { reader.readFromFile(wrapper, false, entry -> { - final int key = entry.getInt(); + final long key = entry.getLong(); index.put(key, fileIndex[0]++); if (isWal) { dataInWalFile.set(key); @@ -457,16 +447,17 @@ private void flushNext(OutputStream out, Buffer buffer) throws IOException { try { rwLock.writeLock().lock(); + final Enumeration iterator = buffer.iterator(false); while (iterator.hasMoreElements()) { final ByteBuffer byteBuffer = iterator.nextElement(); - final long address = RecordUtil - .indexToAddress(recordSize, compactionState.nextFileRecordIndex); + final long address = compactionState.nextFileRecordIndex; compactionState.nextFileRecordIndex++; - final int key = byteBuffer.getInt(); + final long key = byteBuffer.getLong(); + if (!compactionState.dataInNextWalFile.get(key)) { - index.put(key, RecordUtil.addressToIndex(recordSize, address)); + index.put(key, address); compactionState.dataInNextFile.set(key); } } @@ -476,21 +467,21 @@ private void flushNext(OutputStream out, Buffer buffer) throws IOException { buffer.clear(); } - public void put(final byte[] key, final byte[] value, final int valueOffset) throws IOException { - put(ByteUtil.toInt(key, 0), value, valueOffset); + put(ByteUtil.toLong(key, 0), value, valueOffset); } public void put(final byte[] key, final byte[] value) throws IOException { put(key, value, 0); } - public void put(int key, byte[] value) throws IOException { + public void put(long key, byte[] value) throws IOException { put(key, value, 0); } + LinkedList newk= new LinkedList<>(); - public void put(int key, byte[] value, int valueOffset) throws IOException { + public void put(long key, byte[] value, int valueOffset) throws IOException { if (exceptionDuringBackgroundOps != null) { throw new StormDBRuntimeException("Will not accept any further writes since the " + "last compaction resulted in an exception!", exceptionDuringBackgroundOps); @@ -504,7 +495,7 @@ public void put(int key, byte[] value, int valueOffset) throws IOException { try { boolean updatedInPlace = false; - final int recordIndexForKey = index.get(key); + final long recordIndexForKey = index.get(key); // Check if the key exists in the WAL file. if ((recordIndexForKey != RESERVED_KEY_MARKER) && ((isCompactionInProgress() && compactionState.dataInNextWalFile.get(key)) || (!isCompactionInProgress() && dataInWalFile.get(key)))) { @@ -546,6 +537,73 @@ public void put(int key, byte[] value, int valueOffset) throws IOException { } } + public void put(long key, byte[] value, int valueOffset) throws IOException { + if (exceptionDuringBackgroundOps != null) { + throw new StormDBRuntimeException("Will not accept any further writes since the " + + "last compaction resulted in an exception!", exceptionDuringBackgroundOps); + } + if (key == RESERVED_KEY_MARKER) { + throw new ReservedKeyException(RESERVED_KEY_MARKER); + } + + rwLock.writeLock().lock(); + try { + boolean updatedInPlace = false; + + // Index now stores FILE ADDRESSES (byte positions) + final long address = index.get(key); + + // Check if the key exists in the WAL file for in-place update + if ((address != RESERVED_KEY_MARKER) && + ((isCompactionInProgress() && compactionState.dataInNextWalFile.get(key)) || + (!isCompactionInProgress() && dataInWalFile.get(key)))) { + + // Direct address comparison + if (address >= bytesInWalFile) { + int addressInBuffer = (int)(address - bytesInWalFile); + // NEW: update() method now handles variable-length values + updatedInPlace = buffer.update(key, value, valueOffset, addressInBuffer); + } + } + + // If we couldn't update in place, add new record + if (!updatedInPlace) { + try { + // NEW: Try to add to buffer (may throw BufferFullException) + final int addressInBuffer = buffer.add(key, value, valueOffset); + + // NEW: Store direct file address (no RecordUtil conversion) + final long absoluteAddress = bytesInWalFile + addressInBuffer; + index.put(key, absoluteAddress); + + } catch (BufferFullException e) { + // NEW: Handle buffer full gracefully with exception handling + flush(); + + // Notify compaction thread + synchronized (compactionSync) { + compactionSync.notifyAll(); + } + + // NEW: Retry adding after flush + final int addressInBuffer = buffer.add(key, value, valueOffset); + final long absoluteAddress = bytesInWalFile + addressInBuffer; + index.put(key, absoluteAddress); + } + } + + // Mark key as present in WAL (UNCHANGED) + if (isCompactionInProgress()) { + compactionState.dataInNextWalFile.set(key); + } else { + dataInWalFile.set(key); + } + + } finally { + rwLock.writeLock().unlock(); + } + } + public void flush() throws IOException { rwLock.writeLock().lock(); try { @@ -658,22 +716,22 @@ private void iterate(final boolean useLatestWalFile, final boolean readInMemoryB } } - public byte[] randomGet(final int key) throws IOException, StormDBException { - int recordIndex; + public byte[] randomGet(final long key) throws IOException, StormDBException { + long recordAddress; final RandomAccessFileWrapper f; byte[] value; rwLock.readLock().lock(); final long address; try { - recordIndex = index.get(key); - if (recordIndex == RESERVED_KEY_MARKER) { // No mapping value. + recordAddress = index.get(key); + if (recordAddress == RESERVED_KEY_MARKER) { // No mapping value. return null; // NOSONAR - returning null is a part of the interface. } value = new byte[conf.getValueSize()]; if (isCompactionInProgress() && compactionState.dataInNextWalFile.get(key)) { - address = RecordUtil.indexToAddress(recordSize, recordIndex); + address = RecordUtil.indexToAddress(recordSize, recordAddress); if (address >= bytesInWalFile) { System.arraycopy(buffer.array(), (int) (address - bytesInWalFile + Config.KEY_SIZE), @@ -682,10 +740,10 @@ public byte[] randomGet(final int key) throws IOException, StormDBException { } f = filePool.borrowObject(compactionState.nextWalFile); } else if (isCompactionInProgress() && compactionState.dataInNextFile.get(key)) { - address = RecordUtil.indexToAddress(recordSize, recordIndex); + address = RecordUtil.indexToAddress(recordSize, recordAddress); f = filePool.borrowObject(compactionState.nextDataFile); } else if (dataInWalFile.get(key)) { - address = RecordUtil.indexToAddress(recordSize, recordIndex); + address = RecordUtil.indexToAddress(recordSize, recordAddress); // If compaction is in progress, we can not read in-memory. if (!isCompactionInProgress() && address >= bytesInWalFile) { System.arraycopy(buffer.array(), @@ -695,7 +753,7 @@ public byte[] randomGet(final int key) throws IOException, StormDBException { } f = filePool.borrowObject(walFile); } else { - address = RecordUtil.indexToAddress(recordSize, recordIndex); + address = RecordUtil.indexToAddress(recordSize, recordAddress); f = filePool.borrowObject(dataFile); } } finally { diff --git a/src/main/java/com/clevertap/stormdb/exceptions/BufferFullException.java b/src/main/java/com/clevertap/stormdb/exceptions/BufferFullException.java new file mode 100644 index 0000000..c549361 --- /dev/null +++ b/src/main/java/com/clevertap/stormdb/exceptions/BufferFullException.java @@ -0,0 +1,16 @@ +package com.clevertap.stormdb.exceptions; + +public class BufferFullException extends RuntimeException { + private final int requiredSize; + private final int availableSize; + + public BufferFullException(int requiredSize, int availableSize) { + super("Buffer full: required " + requiredSize + " bytes, available " + availableSize + " bytes"); + this.requiredSize = requiredSize; + this.availableSize = availableSize; + } + + public int getRequiredSize() { return requiredSize; } + public int getAvailableSize() { return availableSize; } +} + diff --git a/src/main/java/com/clevertap/stormdb/exceptions/ReservedKeyException.java b/src/main/java/com/clevertap/stormdb/exceptions/ReservedKeyException.java index c74ca08..14d92ec 100644 --- a/src/main/java/com/clevertap/stormdb/exceptions/ReservedKeyException.java +++ b/src/main/java/com/clevertap/stormdb/exceptions/ReservedKeyException.java @@ -5,7 +5,7 @@ */ public class ReservedKeyException extends RuntimeException { - public ReservedKeyException(final int key) { - super("The key 0x" + Integer.toHexString(key) + " is reserved for internal structures."); + public ReservedKeyException(final long key) { + super("The key 0x" + Long.toHexString(key) + " is reserved for internal structures."); } } diff --git a/src/main/java/com/clevertap/stormdb/maps/DefaultIndexMap.java b/src/main/java/com/clevertap/stormdb/maps/DefaultIndexMap.java index d108def..29e77df 100644 --- a/src/main/java/com/clevertap/stormdb/maps/DefaultIndexMap.java +++ b/src/main/java/com/clevertap/stormdb/maps/DefaultIndexMap.java @@ -2,9 +2,11 @@ import com.clevertap.stormdb.StormDB; import gnu.trove.map.hash.TIntIntHashMap; +import gnu.trove.map.hash.TLongIntHashMap; +import gnu.trove.map.hash.TLongLongHashMap; public class DefaultIndexMap implements IndexMap { - private final TIntIntHashMap indexMap; + private final TLongLongHashMap indexMap; private static final int DEFAULT_INITIAL_CAPACITY = 100_000; private static final float DEFAULT_LOAD_FACTOR = 0.95f; @@ -13,17 +15,17 @@ public DefaultIndexMap() { } public DefaultIndexMap(int initialCapacity, float loadFactor) { - indexMap = new TIntIntHashMap(initialCapacity, loadFactor, StormDB.RESERVED_KEY_MARKER, + indexMap = new TLongLongHashMap(initialCapacity, loadFactor, StormDB.RESERVED_KEY_MARKER, StormDB.RESERVED_KEY_MARKER); } @Override - public void put(int key, int indexValue) { - indexMap.put(key, indexValue); + public void put(long key, long addressValue) { + indexMap.put(key, addressValue); } @Override - public int get(int key) { + public long get(long key) { return indexMap.get(key); } diff --git a/src/main/java/com/clevertap/stormdb/maps/IndexMap.java b/src/main/java/com/clevertap/stormdb/maps/IndexMap.java index 038e45f..3ba4d24 100644 --- a/src/main/java/com/clevertap/stormdb/maps/IndexMap.java +++ b/src/main/java/com/clevertap/stormdb/maps/IndexMap.java @@ -5,21 +5,20 @@ public interface IndexMap { /** - * API to support put for the key/index pair in question. Key {@link StormDB#RESERVED_KEY_MARKER} + * API to support put for the key/address pair in question. Key {@link StormDB#RESERVED_KEY_MARKER} * is reserved and custom implementations must make sure reserved keys are not used. * @param key The key to be inserted - * @param indexValue The index mapping for the key + * @param address The index mapping for the key */ - void put(int key, int indexValue); - + void put(long key, long address); /** * API to support for get for the key. If get fails, return {@link StormDB#RESERVED_KEY_MARKER} * which represents null or not found. - * @param key The key whose index value is to be retrieved. - * @return The index value for the key asked. + * @param key The key whose address value is to be retrieved. + * @return The address value for the key asked. */ - int get(int key); + long get(long key); /** diff --git a/src/main/java/com/clevertap/stormdb/utils/BitSetList.java b/src/main/java/com/clevertap/stormdb/utils/BitSetList.java new file mode 100644 index 0000000..ccd0cbe --- /dev/null +++ b/src/main/java/com/clevertap/stormdb/utils/BitSetList.java @@ -0,0 +1,15 @@ +package com.clevertap.stormdb.utils; + +public interface BitSetList { + int BYTES_IN_ONE_BIT_SET = (Integer.MAX_VALUE / 8); + int MAX_BITS_IN_BIT_SET = BYTES_IN_ONE_BIT_SET * 8; + + void ensureIdx(int idx); + + void set(long uid); + + boolean get(long uid); + + void unset(long uid); +} + diff --git a/src/main/java/com/clevertap/stormdb/utils/BitSetLong.java b/src/main/java/com/clevertap/stormdb/utils/BitSetLong.java new file mode 100644 index 0000000..66e5c3e --- /dev/null +++ b/src/main/java/com/clevertap/stormdb/utils/BitSetLong.java @@ -0,0 +1,73 @@ +package com.clevertap.stormdb.utils; + + +import gnu.trove.procedure.TLongProcedure; +import java.util.ArrayList; +import java.util.BitSet; + +public class BitSetLong implements BitSetList { + + ArrayList bitSetList; + + public BitSetLong() { + bitSetList = new ArrayList<>(); + } + + @Override + public void ensureIdx(int idx) { + if (idx >= bitSetList.size()) { + while (bitSetList.size() < idx + 1) { + bitSetList.add(null); + } + } + + if (bitSetList.get(idx) == null) { + bitSetList.set(idx, new BitSet()); + } + } + + @Override + public void set(long long_uid) { + int idx = (int) (long_uid / MAX_BITS_IN_BIT_SET); + ensureIdx(idx); + int toAdd = (int) (long_uid % MAX_BITS_IN_BIT_SET); + bitSetList.get(idx).set(toAdd); + } + + @Override + public boolean get(long long_uid) { + int idx = (int) (long_uid / MAX_BITS_IN_BIT_SET); + return idx < bitSetList.size() && bitSetList.get(idx) != null + && bitSetList.get(idx).get((int) (long_uid % MAX_BITS_IN_BIT_SET)); + } + + public void unset(long long_uid) { + int idx = (int) (long_uid / MAX_BITS_IN_BIT_SET); + if (get(long_uid)) { + bitSetList.get(idx).set((int) (long_uid % MAX_BITS_IN_BIT_SET), false); + } + } + + public void forEach(TLongProcedure t) { + long i = 0; + for (BitSet bitSet : bitSetList) { + if (bitSet != null) { + int b = bitSet.nextSetBit(0); + while (b != -1) { + t.execute(b + (i * MAX_BITS_IN_BIT_SET)); + b = bitSet.nextSetBit(b + 1); + } + } + i++; + } + } + + // TODO Fix this, the old version #toByteArrayOld() is correct but causes OOM + // this is temp fix to supress OOM + public byte[] toByteArray() { + if (bitSetList == null || bitSetList.isEmpty() || bitSetList.get(0) == null) { + return new byte[0]; + } + return bitSetList.get(0).toByteArray(); + } +} diff --git a/src/main/java/com/clevertap/stormdb/utils/ByteUtil.java b/src/main/java/com/clevertap/stormdb/utils/ByteUtil.java index b4e3302..7c8cde4 100644 --- a/src/main/java/com/clevertap/stormdb/utils/ByteUtil.java +++ b/src/main/java/com/clevertap/stormdb/utils/ByteUtil.java @@ -24,6 +24,17 @@ public static int toInt(final byte[] data, final int offset) { | data[offset + 3] & 0xFF; } + public static long toLong(final byte[] data, final int offset) { + return (long) data[offset] << 56 + | ((long) (data[offset + 1] & 0xFF) << 48) + | ((long) (data[offset + 2] & 0xFF) << 40) + | ((long) (data[offset + 3] & 0xFF) << 32) + | ((long) (data[offset + 4] & 0xFF) << 24) + | ((long) (data[offset + 5] & 0xFF) << 16) + | ((long) (data[offset + 6] & 0xFF) << 8) + | (long) (data[offset + 7] & 0xFF); + } + public static boolean arrayEquals(byte[] primitiveBytes, Deque bytes) { if (bytes == null || primitiveBytes == null) { return false;