Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException
table.deleteWithBatch(batch, key);
}

@Override
public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws CodecException {
table.deleteRangeWithBatch(batch, beginKey, endKey);
}

@Override
public final KeyValueIterator<KEY, VALUE> iterator(KEY prefix, IteratorType type) {
throw new UnsupportedOperationException("Iterating tables directly is not" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@
import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily;
import org.apache.hadoop.hdds.utils.db.managed.ManagedDirectSlice;
Expand Down Expand Up @@ -84,7 +89,7 @@ private static String countSize2String(int count, long size) {
* To implement {@link #equals(Object)} and {@link #hashCode()}
* based on the contents of the bytes.
*/
static final class Bytes implements Closeable {
static final class Bytes implements Comparable<Bytes>, Closeable {
private final AbstractSlice<?> slice;
/** Cache the hash value. */
private final int hash;
Expand Down Expand Up @@ -130,6 +135,12 @@ public String toString() {
return slice.toString();
}

// This method mimics the ByteWiseComparator in RocksDB.
@Override
public int compareTo(RDBBatchOperation.Bytes that) {
return this.slice.compare(that.slice);
}

@Override
public void close() {
slice.close();
Expand Down Expand Up @@ -239,6 +250,46 @@ boolean closeImpl() {
}
}

/**
* Delete range operation to be applied to a {@link ColumnFamily} batch.
*/
private final class DeleteRangeOperation extends Op {
private final byte[] startKey;
private final byte[] endKey;
private final Bytes startKeyBytes;
private final Bytes endKeyBytes;

private DeleteRangeOperation(byte[] startKey, byte[] endKey) {
this.startKey = Objects.requireNonNull(startKey, "startKey == null");
this.endKey = Objects.requireNonNull(endKey, "endKey == null");
this.startKeyBytes = new Bytes(startKey);
this.endKeyBytes = new Bytes(endKey);
Comment on lines +263 to +266
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeleteRangeOperation keeps references to the caller-provided startKey/endKey byte arrays. If those arrays are reused or mutated by the caller after enqueueing, the batch contents and range matching (contains) can become incorrect. Consider defensively copying the arrays (and using the copies for both the byte[] fields and the Bytes wrappers).

Suggested change
this.startKey = Objects.requireNonNull(startKey, "startKey == null");
this.endKey = Objects.requireNonNull(endKey, "endKey == null");
this.startKeyBytes = new Bytes(startKey);
this.endKeyBytes = new Bytes(endKey);
Objects.requireNonNull(startKey, "startKey == null");
Objects.requireNonNull(endKey, "endKey == null");
this.startKey = startKey.clone();
this.endKey = endKey.clone();
this.startKeyBytes = new Bytes(this.startKey);
this.endKeyBytes = new Bytes(this.endKey);

Copilot uses AI. Check for mistakes.
}

@Override
public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException {
family.batchDeleteRange(batch, startKey, endKey);
}

@Override
int totalLength() {
return startKey.length + endKey.length;
}

@Override
boolean closeImpl() {
if (super.closeImpl()) {
IOUtils.close(LOG, startKeyBytes, endKeyBytes);
return true;
}
return false;
}

private boolean contains(Bytes key) {
return startKeyBytes.compareTo(key) <= 0 && endKeyBytes.compareTo(key) > 0;
}
}

/** Cache and deduplicate db ops (put/delete). */
private class OpCache {
/** A (family name -> {@link FamilyCache}) map. */
Expand All @@ -249,34 +300,126 @@ private class FamilyCache {
private final ColumnFamily family;

/**
* A mapping of keys to operations for batch processing in the {@link FamilyCache}.
* The keys are represented as {@link Bytes} objects, encapsulating the byte array or buffer
* for efficient equality and hashing. The values are instances of {@link Op}, representing
* different types of operations that can be applied to a {@link ColumnFamily}.
* A mapping of operation keys to their respective indices in {@code FamilyCache}.
*
* Key details:
* - Maintains a mapping of unique operation keys to their insertion or processing order.
* - Used internally to manage and sort operations during batch writes.
* - Facilitates filtering, overwriting, or deletion of operations based on their keys.
*
* This field is intended to store pending batch updates before they are written to the database.
* It supports operations such as additions and deletions while maintaining the ability to overwrite
* existing entries when necessary.
* Constraints:
* - Keys must be unique, represented using {@link Bytes}, to avoid collisions.
* - Each key is associated with a unique integer index to track insertion order.
*
* This field plays a critical role in managing the logical consistency and proper execution
* order of operations stored in the batch when interacting with a RocksDB-backed system.
*/
private final Map<Bytes, SingleKeyOp> ops = new HashMap<>();
private final Map<Bytes, Integer> singleOpKeys = new HashMap<>();
/**
* Maintains a mapping of unique operation indices to their corresponding {@code Operation} instances.
*
* This map serves as the primary container for recording operations in preparation for a batch write
* within a RocksDB-backed system. Each operation is referenced by an integer index, which determines
* its insertion order and ensures correct sequencing during batch execution.
*
* Key characteristics:
* - Stores operations of type {@code Operation}.
* - Uses a unique integer key (index) for mapping each operation.
* - Serves as an intermediary structure during batch preparation and execution.
*
* Usage context:
* - This map is managed as part of the batch-writing process, which involves organizing,
* filtering, and applying multiple operations in a single cohesive batch.
* - Operations stored in this map are expected to define specific actions (e.g., put, delete,
* delete range) and their associated data (e.g., keys, values).
*/
private final Map<Integer, Op> ops = new HashMap<>();
private boolean isCommit;

private long batchSize;
private long discardedSize;
private int discardedCount;
private int putCount;
private int delCount;
private int delRangeCount;
private AtomicInteger opIndex;

FamilyCache(ColumnFamily family) {
this.family = family;
this.opIndex = new AtomicInteger(0);
}

private DeleteRangeOperation findFirstDeleteRangeMatchingRange(Collection<DeleteRangeOperation> deleteRangeOps,
Bytes key) {
for (DeleteRangeOperation deleteRangeOp : deleteRangeOps) {
if (deleteRangeOp.contains(key)) {
return deleteRangeOp;
}
}
return null;
}

/** Prepare batch write for the entire family. */
/**
* Prepares a batch write operation for a RocksDB-backed system.
*
* This method ensures the orderly execution of operations accumulated in the batch,
* respecting their respective types and order of insertion.
*
* Key functionalities:
* 1. Ensures that the batch is not already committed before proceeding.
* 2. Sorts all operations by their `opIndex` to maintain a consistent execution order.
* 3. Filters and adapts operations to account for any delete range operations that might
* affect other operations in the batch:
* - Operations with keys that fall within the range specified by a delete range operation
* are discarded.
* - Delete range operations are executed in their correct order.
* 4. Applies remaining operations to the write batch, ensuring proper filtering and execution.
* 5. Logs a summary of the batch execution for debugging purposes.
*
* Throws:
* - RocksDatabaseException if any error occurs while applying operations to the write batch.
*
* Prerequisites:
* - The method assumes that the operations are represented by `Operation` objects, each of which
* encapsulates the logic for its specific type.
* - Delete range operations must be represented by the `DeleteRangeOperation` class.
*/
void prepareBatchWrite() throws RocksDatabaseException {
Preconditions.checkState(!isCommit, "%s is already committed.", this);
isCommit = true;
for (Op op : ops.values()) {
op.apply(family, writeBatch);
// Sort Entries based on opIndex and flush the operation to the batch in the same order.
List<Op> opList = ops.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getKey))
.map(Map.Entry::getValue).collect(Collectors.toList());
TreeMap<Integer, DeleteRangeOperation> deleteRangeIndices = new TreeMap<>();
int index = 0;
for (Op op : opList) {
if (op instanceof DeleteRangeOperation) {
DeleteRangeOperation deleteRangeOp = (DeleteRangeOperation) op;
deleteRangeIndices.put(index, deleteRangeOp);
}
index++;
}
index = 0;
for (Op op : opList) {
if (op instanceof DeleteRangeOperation) {
op.apply(family, writeBatch);
} else {
// Find the first delete range op matching which would contain the key after the
// operation has occurred. If there is no such operation then perform the operation otherwise discard the
// op.
SingleKeyOp singleKeyOp = (SingleKeyOp) op;
DeleteRangeOperation deleteRangeOp = findFirstDeleteRangeMatchingRange(
deleteRangeIndices.tailMap(index, false).values(), singleKeyOp.getKeyBytes());
if (deleteRangeOp == null) {
op.apply(family, writeBatch);
} else {
debug(() -> String.format("Discarding Operation with Key: %s as it falls within the range of [%s, %s)",
singleKeyOp.getKeyBytes(), deleteRangeOp.startKeyBytes, deleteRangeOp.endKeyBytes));
discardedCount++;
discardedSize += op.totalLength();
}
}
index++;
}
debug(this::summary);
}
Expand All @@ -299,8 +442,10 @@ void clear() {
}

private void deleteIfExist(Bytes key) {
final SingleKeyOp previous = ops.remove(key);
if (previous != null) {
// remove previous first in order to call release()
Integer previousIndex = singleOpKeys.remove(key);
if (previousIndex != null) {
final SingleKeyOp previous = (SingleKeyOp) ops.remove(previousIndex);
previous.close();
discardedSize += previous.totalLength();
discardedCount++;
Expand All @@ -314,9 +459,10 @@ void overwriteIfExists(SingleKeyOp op) {
Bytes key = op.getKeyBytes();
deleteIfExist(key);
batchSize += op.totalLength();
Op overwritten = ops.put(key, op);
Preconditions.checkState(overwritten == null);

int newIndex = opIndex.getAndIncrement();
final Integer overwrittenOpKey = singleOpKeys.put(key, newIndex);
final Op overwrittenOp = ops.put(newIndex, op);
Preconditions.checkState(overwrittenOpKey == null && overwrittenOp == null);
debug(() -> String.format("%s %s, %s; key=%s", this,
op instanceof DeleteOp ? delString(op.totalLength()) : putString(op.keyLen(), op.valLen()),
batchSizeDiscardedString(), key));
Expand All @@ -332,6 +478,11 @@ void delete(CodecBuffer key) {
overwriteIfExists(new DeleteOp(key));
}

void deleteRange(byte[] startKey, byte[] endKey) {
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FamilyCache.deleteRange(...) bypasses the isCommit guard used by put/delete (via overwriteIfExists). If deleteRange is called after prepareBatchWrite() has started, the operation can be silently dropped (added after opList is built). Add the same Preconditions.checkState(!isCommit, ...) protection here.

Suggested change
void deleteRange(byte[] startKey, byte[] endKey) {
void deleteRange(byte[] startKey, byte[] endKey) {
Preconditions.checkState(!isCommit, "%s is already committed.", this);

Copilot uses AI. Check for mistakes.
delRangeCount++;
ops.put(opIndex.getAndIncrement(), new DeleteRangeOperation(startKey, endKey));
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleteRange(...) increments delRangeCount but does not update batchSize/discard accounting. This makes batchSizeDiscardedString() and the “discarding changes” warning unreliable for batches that include delete-range ops. Consider adding the range op’s totalLength() to batchSize (and/or tracking delete-range bytes separately) for consistent accounting.

Suggested change
ops.put(opIndex.getAndIncrement(), new DeleteRangeOperation(startKey, endKey));
DeleteRangeOperation op = new DeleteRangeOperation(startKey, endKey);
batchSize += op.totalLength();
ops.put(opIndex.getAndIncrement(), op);

Copilot uses AI. Check for mistakes.
}

String putString(int keySize, int valueSize) {
return String.format("put(key: %s, value: %s), #put=%s",
byteSize2String(keySize), byteSize2String(valueSize), putCount);
Expand Down Expand Up @@ -361,6 +512,11 @@ void delete(ColumnFamily family, CodecBuffer key) {
name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)).delete(key);
}

void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) {
name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family))
.deleteRange(startKey, endKey);
}

/** Prepare batch write for the entire cache. */
UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException {
for (Map.Entry<String, FamilyCache> e : name2cache.entrySet()) {
Expand All @@ -382,19 +538,21 @@ String getCommitString() {
int opSize = 0;
int discardedCount = 0;
int discardedSize = 0;
int delRangeCount = 0;

for (FamilyCache f : name2cache.values()) {
putCount += f.putCount;
delCount += f.delCount;
opSize += f.batchSize;
discardedCount += f.discardedCount;
discardedSize += f.discardedSize;
delRangeCount += f.delRangeCount;
}

final int opCount = putCount + delCount;
return String.format(
"#put=%s, #del=%s, batchSize: %s, discarded: %s, committed: %s",
putCount, delCount,
"#put=%s, #del=%s, #delRange=%s, batchSize: %s, discarded: %s, committed: %s",
putCount, delCount, delRangeCount,
countSize2String(opCount, opSize),
countSize2String(discardedCount, discardedSize),
countSize2String(opCount - discardedCount, opSize - discardedSize));
Expand Down Expand Up @@ -449,4 +607,8 @@ public void put(ColumnFamily family, byte[] key, byte[] value) {
opCache.put(family, DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key),
DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(value));
}

public void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) {
opCache.deleteRange(family, startKey, endKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,15 @@ public void deleteWithBatch(BatchOperation batch, byte[] key) {

}

@Override
public void deleteRangeWithBatch(BatchOperation batch, byte[] beginKey, byte[] endKey) {
if (batch instanceof RDBBatchOperation) {
((RDBBatchOperation) batch).deleteRange(family, beginKey, endKey);
} else {
throw new IllegalArgumentException("batch should be RDBBatchOperation");
}
}

@Override
public KeyValueIterator<byte[], byte[]> iterator(byte[] prefix, IteratorType type)
throws RocksDatabaseException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,16 @@ public void batchDelete(ManagedWriteBatch writeBatch, ByteBuffer key)
}
}

public void batchDeleteRange(ManagedWriteBatch writeBatch, byte[] beginKey, byte[] endKey)
throws RocksDatabaseException {
try (UncheckedAutoCloseable ignored = acquire()) {
writeBatch.deleteRange(getHandle(), beginKey, endKey);
} catch (RocksDBException e) {
throw toRocksDatabaseException(this, "batchDeleteRange key " + bytes2String(beginKey) + " - " +
bytes2String(endKey), e);
}
}

public void batchPut(ManagedWriteBatch writeBatch, ByteBuffer key,
ByteBuffer value) throws RocksDatabaseException {
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ default VALUE getReadCopy(KEY key) throws RocksDatabaseException, CodecException
*/
void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException;

/**
* Deletes a range of keys from the metadata store as part of a batch operation.
* @param batch Batch operation to perform the delete operation.
* @param beginKey start metadata key, inclusive.
* @param endKey end metadata key, exclusive.
*/
void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws CodecException;

/**
* Deletes a range of keys from the metadata store.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,11 @@ public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException
}
}

@Override
public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws CodecException {
rawTable.deleteRangeWithBatch(batch, encodeKey(beginKey), encodeKey(endKey));
}

@Override
public void deleteRange(KEY beginKey, KEY endKey) throws RocksDatabaseException, CodecException {
rawTable.deleteRange(encodeKey(beginKey), encodeKey(endKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public void deleteWithBatch(BatchOperation batch, KEY key) {
throw new UnsupportedOperationException();
}

@Override
public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) {
throw new UnsupportedOperationException();
}

@Override
public void deleteRange(KEY beginKey, KEY endKey) {
map.subMap(beginKey, endKey).clear();
Expand Down
Loading