Skip to content

Commit 2fe7fde

Browse files
committed
feat: support codecbuffer in SnapshotDiffJobCodec
1 parent 81f9d64 commit 2fe7fde

File tree

2 files changed

+136
-0
lines changed

2 files changed

+136
-0
lines changed

hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotDiffJob.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@
2121
import com.fasterxml.jackson.databind.DeserializationFeature;
2222
import com.fasterxml.jackson.databind.ObjectMapper;
2323
import com.google.protobuf.InvalidProtocolBufferException;
24+
import jakarta.annotation.Nonnull;
2425
import java.io.IOException;
26+
import java.nio.ByteBuffer;
2527
import java.util.Objects;
2628
import org.apache.commons.lang3.StringUtils;
2729
import org.apache.hadoop.hdds.utils.db.Codec;
30+
import org.apache.hadoop.hdds.utils.db.CodecBuffer;
31+
import org.apache.hadoop.hdds.utils.db.CodecException;
2832
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotDiffJobProto;
2933
import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus;
3034
import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus;
@@ -310,6 +314,51 @@ private static final class SnapshotDiffJobCodec
310314
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
311315
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
312316

317+
@Override
318+
public boolean supportCodecBuffer() {
319+
return true;
320+
}
321+
322+
@Override
323+
public CodecBuffer toCodecBuffer(@Nonnull SnapshotDiffJob object,
324+
CodecBuffer.Allocator allocator) throws CodecException {
325+
326+
SnapshotDiffJobProto proto = object.toProtoBuf();
327+
final int size = proto.getSerializedSize();
328+
final CodecBuffer buffer = allocator.apply(size);
329+
330+
buffer.put(out -> {
331+
try {
332+
proto.writeTo(out);
333+
return size;
334+
} catch (IOException e) {
335+
throw new IllegalStateException("Failed to write protobuf to buffer", e);
336+
}
337+
});
338+
return buffer;
339+
}
340+
341+
@Override
342+
public SnapshotDiffJob fromCodecBuffer(@Nonnull CodecBuffer buffer)
343+
throws CodecException {
344+
// Direct protobuf parsing from InputStream to avoid byte array copy
345+
try (java.io.InputStream in = buffer.getInputStream()) {
346+
SnapshotDiffJobProto proto = SnapshotDiffJobProto.parseFrom(in);
347+
return SnapshotDiffJob.getFromProtoBuf(proto);
348+
} catch (InvalidProtocolBufferException e) {
349+
ByteBuffer bb = buffer.asReadOnlyByteBuffer();
350+
byte[] data = new byte[bb.remaining()];
351+
bb.get(data);
352+
try {
353+
return MAPPER.readValue(data, SnapshotDiffJob.class);
354+
} catch (IOException ex) {
355+
throw new CodecException("Failed to deserialize SnapshotDiffJob from JSON", ex);
356+
}
357+
} catch (IOException e) {
358+
throw new CodecException("Failed to read from CodecBuffer", e);
359+
}
360+
}
361+
313362
@Override
314363
public Class<SnapshotDiffJob> getTypeClass() {
315364
return SnapshotDiffJob.class;

hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotDiffJobCodec.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
package org.apache.hadoop.ozone.om.helpers;
1919

2020
import static org.junit.jupiter.api.Assertions.assertEquals;
21+
import static org.junit.jupiter.api.Assertions.assertTrue;
2122

23+
import java.nio.ByteBuffer;
2224
import org.apache.hadoop.hdds.utils.db.Codec;
25+
import org.apache.hadoop.hdds.utils.db.CodecBuffer;
2326
import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus;
2427
import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus;
2528
import org.junit.jupiter.api.Test;
@@ -70,4 +73,88 @@ public void testOldJsonSerializedDataCanBeReadByNewCodec() throws Exception {
7073

7174
assertEquals(0.0, parsed.getKeysProcessedPct());
7275
}
76+
77+
@Test
78+
public void testCodecBufferSupport() throws Exception {
79+
assertTrue(newCodec.supportCodecBuffer());
80+
81+
SnapshotDiffJob original = new SnapshotDiffJob(
82+
System.currentTimeMillis(),
83+
"test-job-buffer",
84+
JobStatus.DONE,
85+
"testVol",
86+
"testBucket",
87+
"fromSnap",
88+
"toSnap",
89+
false,
90+
true,
91+
500L,
92+
SubStatus.OBJECT_ID_MAP_GEN_FSO,
93+
75.5
94+
);
95+
96+
// Test with direct allocator
97+
try (CodecBuffer buffer = newCodec.toCodecBuffer(original, CodecBuffer.Allocator.getDirect())) {
98+
SnapshotDiffJob decoded = newCodec.fromCodecBuffer(buffer);
99+
assertSnapshotDiffJobEquals(original, decoded);
100+
}
101+
102+
// Test with heap allocator
103+
try (CodecBuffer buffer = newCodec.toCodecBuffer(original, CodecBuffer.Allocator.getHeap())) {
104+
SnapshotDiffJob decoded = newCodec.fromCodecBuffer(buffer);
105+
assertSnapshotDiffJobEquals(original, decoded);
106+
}
107+
}
108+
109+
@Test
110+
public void testCodecBufferBackwardCompatibility() throws Exception {
111+
112+
SnapshotDiffJob original = new SnapshotDiffJob(
113+
987654321L,
114+
"compat-job",
115+
JobStatus.FAILED,
116+
"volX",
117+
"buckY",
118+
"oldSnap",
119+
"newSnap",
120+
true,
121+
true,
122+
0L,
123+
null,
124+
0.0
125+
);
126+
original.setReason("Test failure reason");
127+
128+
129+
byte[] jsonData = oldCodec.toPersistedFormatImpl(original);
130+
131+
// Create a CodecBuffer from the JSON data
132+
// This simulates reading old format data from storage
133+
try (CodecBuffer buffer = CodecBuffer.Allocator.getHeap().apply(jsonData.length)) {
134+
buffer.put(ByteBuffer.wrap(jsonData));
135+
136+
// The new codec should handle JSON fallback in fromCodecBuffer
137+
SnapshotDiffJob decoded = newCodec.fromCodecBuffer(buffer);
138+
139+
assertEquals(original.getJobId(), decoded.getJobId());
140+
assertEquals(original.getStatus(), decoded.getStatus());
141+
assertEquals(original.getReason(), decoded.getReason());
142+
}
143+
}
144+
145+
private void assertSnapshotDiffJobEquals(SnapshotDiffJob expected, SnapshotDiffJob actual) {
146+
assertEquals(expected.getCreationTime(), actual.getCreationTime());
147+
assertEquals(expected.getJobId(), actual.getJobId());
148+
assertEquals(expected.getStatus(), actual.getStatus());
149+
assertEquals(expected.getVolume(), actual.getVolume());
150+
assertEquals(expected.getBucket(), actual.getBucket());
151+
assertEquals(expected.getFromSnapshot(), actual.getFromSnapshot());
152+
assertEquals(expected.getToSnapshot(), actual.getToSnapshot());
153+
assertEquals(expected.isForceFullDiff(), actual.isForceFullDiff());
154+
assertEquals(expected.isNativeDiffDisabled(), actual.isNativeDiffDisabled());
155+
assertEquals(expected.getTotalDiffEntries(), actual.getTotalDiffEntries());
156+
assertEquals(expected.getSubStatus(), actual.getSubStatus());
157+
assertEquals(expected.getKeysProcessedPct(), actual.getKeysProcessedPct(), 0.001);
158+
assertEquals(expected.getReason(), actual.getReason());
159+
}
73160
}

0 commit comments

Comments
 (0)