Skip to content

Commit 90994bb

Browse files
authored
HDDS-14330. MinHeapMergeIterator should use key comparator while popping out entries from the heap (apache#9576)
1 parent 14e4e1f commit 90994bb

File tree

3 files changed

+43
-36
lines changed

3 files changed

+43
-36
lines changed

hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ default <KEY> ClosableIterator<KeyValue<KEY, Collection<Object>>> getMergeIterat
198198
KeyValue<KEY, Object> defaultNullValue = newKeyValue(null, null);
199199
Comparator<KeyValue<KEY, Object>> comparator = Comparator.comparing(KeyValue::getKey, keyComparator);
200200
return new MinHeapMergeIterator<KeyValue<KEY, Object>, Table.KeyValueIterator<KEY, Object>,
201-
KeyValue<KEY, Collection<Object>>>(table.length + 1, comparator) {
201+
KeyValue<KEY, Collection<Object>>>(table.length, comparator) {
202202
@Override
203203
protected Table.KeyValueIterator<KEY, Object> getIterator(int idx) throws IOException {
204204
return table[idx].iterator(prefix);

hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/MinHeapMergeIterator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public MinHeapMergeIterator(int numberOfIterators, Comparator<K> comparator) {
5555
keys = new HashMap<>(numberOfIterators);
5656
iterators = IntStream.range(0, numberOfIterators).mapToObj(i -> (I) null).collect(Collectors.toList());
5757
this.initialized = false;
58-
this.comparator = comparator;
58+
this.comparator = Objects.requireNonNull(comparator, "comparator cannot be null");
5959
}
6060

6161
protected abstract I getIterator(int idx) throws IOException;
@@ -109,7 +109,7 @@ public V next() {
109109
// Clear the keys list by setting all entries to null.
110110
keys.clear();
111111
// Advance all entries with the same key (from different files)
112-
while (!minHeap.isEmpty() && Objects.equals(minHeap.peek().getCurrentKey(), currentKey)) {
112+
while (!minHeap.isEmpty() && comparator.compare(minHeap.peek().getCurrentKey(), currentKey) == 0) {
113113
HeapEntry<K> entry = minHeap.poll();
114114
int idx = entry.index;
115115
// Set the key for the current entry in the keys list.

hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/hadoop/hdds/utils/db/TestMinHeapMergeIterator.java

Lines changed: 40 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,12 @@
2626
import com.google.common.collect.ImmutableList;
2727
import com.google.common.collect.ImmutableMap;
2828
import com.google.common.collect.ImmutableSet;
29+
import com.google.common.primitives.UnsignedBytes;
2930
import java.io.Closeable;
3031
import java.io.IOException;
3132
import java.io.UncheckedIOException;
3233
import java.util.ArrayList;
34+
import java.util.Arrays;
3335
import java.util.Collections;
3436
import java.util.Comparator;
3537
import java.util.HashSet;
@@ -38,14 +40,15 @@
3840
import java.util.Map;
3941
import java.util.NoSuchElementException;
4042
import java.util.Set;
43+
import org.apache.hadoop.hdds.StringUtils;
4144
import org.junit.jupiter.api.Test;
4245

4346
/**
4447
* Unit tests for {@link MinHeapMergeIterator}.
4548
*/
4649
class TestMinHeapMergeIterator {
4750

48-
private static final Comparator<String> STRING_COMPARATOR = String::compareTo;
51+
private static final Comparator<byte[]> BYTE_COMPARATOR = UnsignedBytes.lexicographicalComparator();
4952

5053
/**
5154
* A closeable iterator which tracks close() calls.
@@ -87,8 +90,8 @@ private static final class MergeResult {
8790
private final String key;
8891
private final Set<Integer> sources;
8992

90-
private MergeResult(String key, Set<Integer> sources) {
91-
this.key = key;
93+
private MergeResult(byte[] key, Set<Integer> sources) {
94+
this.key = StringUtils.bytes2String(key);
9295
this.sources = sources;
9396
}
9497

@@ -104,17 +107,17 @@ Set<Integer> getSources() {
104107
/**
105108
* Concrete implementation for tests.
106109
*/
107-
private static final class TestIterator extends MinHeapMergeIterator<String,
108-
TrackingCloseableIterator<String>, MergeResult> {
110+
private static final class TestIterator extends MinHeapMergeIterator<byte[],
111+
TrackingCloseableIterator<byte[]>, MergeResult> {
109112

110-
private final List<TrackingCloseableIterator<String>> itrs;
113+
private final List<TrackingCloseableIterator<byte[]>> itrs;
111114
private final List<MergeResult> merged = new ArrayList<>();
112115

113116
private IOException ioExceptionAtIndex;
114117
private int exceptionIndex = -1;
115118

116-
private TestIterator(List<TrackingCloseableIterator<String>> itrs) {
117-
super(itrs.size(), STRING_COMPARATOR);
119+
private TestIterator(List<TrackingCloseableIterator<byte[]>> itrs) {
120+
super(itrs.size(), BYTE_COMPARATOR);
118121
this.itrs = itrs;
119122
}
120123

@@ -125,7 +128,7 @@ private TestIterator withGetIteratorIOException(int index, IOException ex) {
125128
}
126129

127130
@Override
128-
protected TrackingCloseableIterator<String> getIterator(int idx)
131+
protected TrackingCloseableIterator<byte[]> getIterator(int idx)
129132
throws IOException {
130133
if (idx == exceptionIndex) {
131134
if (ioExceptionAtIndex != null) {
@@ -136,9 +139,9 @@ protected TrackingCloseableIterator<String> getIterator(int idx)
136139
}
137140

138141
@Override
139-
protected MergeResult merge(Map<Integer, String> keysToMerge) {
142+
protected MergeResult merge(Map<Integer, byte[]> keysToMerge) {
140143
// All values in keysToMerge are expected to be equal (same key across iterators).
141-
String key = keysToMerge.values().iterator().next();
144+
byte[] key = keysToMerge.values().iterator().next();
142145
MergeResult r = new MergeResult(key, new HashSet<>(keysToMerge.keySet()));
143146
merged.add(r);
144147
return r;
@@ -149,14 +152,18 @@ List<MergeResult> getMerged() {
149152
}
150153
}
151154

155+
private ImmutableList<byte[]> toBytesList(String... keys) {
156+
return Arrays.stream(keys).map(StringUtils::string2Bytes).collect(ImmutableList.toImmutableList());
157+
}
158+
152159
@Test
153160
void testMergedOrderAndDuplicateGroupingAndAutoCloseOnExhaustion() {
154-
TrackingCloseableIterator<String> itr0 =
155-
new TrackingCloseableIterator<>(ImmutableList.of("a", "c", "e", "g"));
156-
TrackingCloseableIterator<String> itr1 =
157-
new TrackingCloseableIterator<>(ImmutableList.of("b", "c", "d", "g", "h"));
158-
TrackingCloseableIterator<String> itr2 =
159-
new TrackingCloseableIterator<>(ImmutableList.of("c", "e", "f", "h"));
161+
TrackingCloseableIterator<byte[]> itr0 =
162+
new TrackingCloseableIterator<>(toBytesList("a", "c", "e", "g"));
163+
TrackingCloseableIterator<byte[]> itr1 =
164+
new TrackingCloseableIterator<>(toBytesList("b", "c", "d", "g", "h"));
165+
TrackingCloseableIterator<byte[]> itr2 =
166+
new TrackingCloseableIterator<>(toBytesList("c", "e", "f", "h"));
160167

161168
List<String> keys = new ArrayList<>();
162169
try (TestIterator mergeItr = new TestIterator(ImmutableList.of(itr0, itr1, itr2))) {
@@ -195,10 +202,10 @@ void testMergedOrderAndDuplicateGroupingAndAutoCloseOnExhaustion() {
195202

196203
@Test
197204
void testInitClosesEmptyIterators() {
198-
TrackingCloseableIterator<String> empty =
205+
TrackingCloseableIterator<byte[]> empty =
199206
new TrackingCloseableIterator<>(Collections.emptyList());
200-
TrackingCloseableIterator<String> nonEmpty =
201-
new TrackingCloseableIterator<>(ImmutableList.of("a"));
207+
TrackingCloseableIterator<byte[]> nonEmpty =
208+
new TrackingCloseableIterator<>(toBytesList("a"));
202209

203210
try (TestIterator mergeItr = new TestIterator(ImmutableList.of(empty, nonEmpty))) {
204211
assertTrue(mergeItr.hasNext()); // triggers init
@@ -212,10 +219,10 @@ void testInitClosesEmptyIterators() {
212219

213220
@Test
214221
void testCloseClosesAllIterators() {
215-
TrackingCloseableIterator<String> itr0 =
216-
new TrackingCloseableIterator<>(ImmutableList.of("a", "c"));
217-
TrackingCloseableIterator<String> itr1 =
218-
new TrackingCloseableIterator<>(ImmutableList.of("b", "d"));
222+
TrackingCloseableIterator<byte[]> itr0 =
223+
new TrackingCloseableIterator<>(toBytesList("a", "c"));
224+
TrackingCloseableIterator<byte[]> itr1 =
225+
new TrackingCloseableIterator<>(toBytesList("b", "d"));
219226

220227
try (TestIterator mergeItr = new TestIterator(ImmutableList.of(itr0, itr1))) {
221228
assertTrue(mergeItr.hasNext()); // triggers init
@@ -233,10 +240,10 @@ void testCloseClosesAllIterators() {
233240
@Test
234241
void testHasNextWrapsIOExceptionFromGetIterator() {
235242
IOException expected = new IOException("boom");
236-
TrackingCloseableIterator<String> itr0 =
237-
new TrackingCloseableIterator<>(ImmutableList.of("a"));
238-
TrackingCloseableIterator<String> itr1 =
239-
new TrackingCloseableIterator<>(ImmutableList.of("b"));
243+
TrackingCloseableIterator<byte[]> itr0 =
244+
new TrackingCloseableIterator<>(toBytesList("a"));
245+
TrackingCloseableIterator<byte[]> itr1 =
246+
new TrackingCloseableIterator<>(toBytesList("b"));
240247
TestIterator mergeItr = new TestIterator(ImmutableList.of(itr0, itr1));
241248
mergeItr.withGetIteratorIOException(1, expected);
242249
try (TestIterator ignored = mergeItr) {
@@ -253,10 +260,10 @@ void testHasNextWrapsIOExceptionFromGetIterator() {
253260

254261
@Test
255262
void testHasNextWrapsRocksDBExceptionFromGetIteratorAndClosesOpenedIterators() throws Exception {
256-
TrackingCloseableIterator<String> itr0 =
257-
new TrackingCloseableIterator<>(ImmutableList.of("a", "b"));
258-
TrackingCloseableIterator<String> itr1 =
259-
new TrackingCloseableIterator<>(ImmutableList.of("c"));
263+
TrackingCloseableIterator<byte[]> itr0 =
264+
new TrackingCloseableIterator<>(toBytesList("a", "b"));
265+
TrackingCloseableIterator<byte[]> itr1 =
266+
new TrackingCloseableIterator<>(toBytesList("c"));
260267
RocksDatabaseException rdbEx = new RocksDatabaseException("rocks");
261268
TestIterator mergeItr = new TestIterator(ImmutableList.of(itr0, itr1));
262269
mergeItr.withGetIteratorIOException(1, rdbEx);
@@ -275,7 +282,7 @@ void testHasNextWrapsRocksDBExceptionFromGetIteratorAndClosesOpenedIterators() t
275282

276283
@Test
277284
void testNextWhenEmptyThrowsNoSuchElement() {
278-
TrackingCloseableIterator<String> empty =
285+
TrackingCloseableIterator<byte[]> empty =
279286
new TrackingCloseableIterator<>(Collections.emptyList());
280287
try (TestIterator mergeItr = new TestIterator(ImmutableList.of(empty))) {
281288
assertFalse(mergeItr.hasNext());

0 commit comments

Comments
 (0)