2525import org .apache .hadoop .hdds .protocol .datanode .proto .ContainerProtos ;
2626import org .apache .hadoop .ozone .common .ChecksumByteBuffer ;
2727import org .apache .hadoop .ozone .common .ChecksumByteBufferFactory ;
28+ import org .apache .hadoop .ozone .container .common .helpers .BlockData ;
2829import org .apache .ratis .thirdparty .com .google .protobuf .ByteString ;
2930
3031/**
@@ -63,9 +64,13 @@ public ContainerMerkleTreeWriter(ContainerProtos.ContainerMerkleTree fromTree) {
6364 id2Block = new TreeMap <>();
6465 for (ContainerProtos .BlockMerkleTree blockTree : fromTree .getBlockMerkleTreeList ()) {
6566 long blockID = blockTree .getBlockID ();
66- addBlock (blockID );
67- for (ContainerProtos .ChunkMerkleTree chunkTree : blockTree .getChunkMerkleTreeList ()) {
68- addChunks (blockID , chunkTree );
67+ if (blockTree .getDeleted ()) {
68+ setDeletedBlock (blockID , blockTree .getDataChecksum ());
69+ } else {
70+ addBlock (blockID );
71+ for (ContainerProtos .ChunkMerkleTree chunkTree : blockTree .getChunkMerkleTreeList ()) {
72+ addChunks (blockID , new ChunkMerkleTreeWriter (chunkTree ));
73+ }
6974 }
7075 }
7176 }
@@ -81,7 +86,7 @@ public ContainerMerkleTreeWriter(ContainerProtos.ContainerMerkleTree fromTree) {
8186 */
8287 public void addChunks (long blockID , boolean checksumMatches , Collection <ContainerProtos .ChunkInfo > chunks ) {
8388 for (ContainerProtos .ChunkInfo chunk : chunks ) {
84- addChunks (blockID , checksumMatches , chunk );
89+ addChunks (blockID , new ChunkMerkleTreeWriter ( chunk , checksumMatches ) );
8590 }
8691 }
8792
@@ -91,12 +96,6 @@ public void addChunks(long blockID, boolean checksumMatches, ContainerProtos.Chu
9196 }
9297 }
9398
94- private void addChunks (long blockID , ContainerProtos .ChunkMerkleTree ... chunks ) {
95- for (ContainerProtos .ChunkMerkleTree chunkTree : chunks ) {
96- addChunks (blockID , new ChunkMerkleTreeWriter (chunkTree ));
97- }
98- }
99-
10099 private void addChunks (long blockID , ChunkMerkleTreeWriter chunkWriter ) {
101100 id2Block .computeIfAbsent (blockID , BlockMerkleTreeWriter ::new ).addChunks (chunkWriter );
102101 }
@@ -111,13 +110,93 @@ public void addBlock(long blockID) {
111110 id2Block .computeIfAbsent (blockID , BlockMerkleTreeWriter ::new );
112111 }
113112
113+ /**
114+ * Creates a deleted block entry in the merkle tree and assigns the block this fixed checksum.
115+ * If the block already exists with child data it is overwritten.
116+ *
117+ * This method is used on the reconciliation path to update the data checksum used for a deleted block based on a
118+ * peer's value.
119+ */
120+ public void setDeletedBlock (long blockID , long dataChecksum ) {
121+ BlockMerkleTreeWriter blockWriter = new BlockMerkleTreeWriter (blockID );
122+ blockWriter .markDeleted (dataChecksum );
123+ id2Block .put (blockID , blockWriter );
124+ }
125+
126+ /**
127+ * Merges the content from the provided tree with this tree writer.
128+ * Conflicts where this tree writer and the incoming existingTree parameter have an entry for the same block are
129+ * resolved in the following manner:
130+ * - A deleted block supersedes a live block
131+ * - Data cannot be un-deleted, so if a delete is ever witnessed, that is the state the block should converge to.
132+ * - If both blocks are either deleted or live, the value in this writer supersedes the value in the existingTree
133+ * parameter.
134+ * - Our writer has the last witnessed information that is going to be persisted after this merge.
135+ *
136+ * For example, consider the case where a peer has deleted a block and we have a corrupt copy that has not yet been
137+ * deleted. When we reconcile with this peer, we will mark the block as deleted and use the peer's checksum in our
138+ * merkle tree to make the trees converge. The "fix" for corrupted data that is supposed to be deleted is to delete
139+ * it. After this, if the scanner runs again before the block is deleted, we don't want to update the tree with the
140+ * scanner's value because it would again diverge from the peer due to data that is expected to be deleted.
141+ * This would cause the checksum to oscillate back and forth until the block is deleted, instead of converging.
142+ */
143+ public ContainerProtos .ContainerMerkleTree update (ContainerProtos .ContainerMerkleTree existingTree ) {
144+ for (ContainerProtos .BlockMerkleTree existingBlockTree : existingTree .getBlockMerkleTreeList ()) {
145+ long blockID = existingBlockTree .getBlockID ();
146+ BlockMerkleTreeWriter ourBlockTree = id2Block .get (blockID );
147+ if (ourBlockTree != null ) {
148+ // both trees contain the block. We will only consider the incoming/existing value if it does not match our
149+ // current state
150+ if (!ourBlockTree .isDeleted () && existingBlockTree .getDeleted ()) {
151+ setDeletedBlock (blockID , existingBlockTree .getDataChecksum ());
152+ }
153+ // In all other cases, keep using our writer's value over the existing one because either:
154+ // - The deleted states match between the two blocks OR
155+ // - Our block is deleted and the existing one is not, so we have the latest value to use.
156+ } else if (existingBlockTree .getDeleted ()) {
157+ // Our tree does not have this block. Only take the value if it is deleted.
158+ // The definitive set of live blocks will come from this tree writer.
159+ setDeletedBlock (blockID , existingBlockTree .getDataChecksum ());
160+ }
161+ }
162+ return toProtoBuilder ().build ();
163+ }
164+
165+ /**
166+ * Adds deleted blocks to this merkle tree. The blocks' checksums are computed from the checksums in the BlockData.
167+ * If a block with the same ID already exists in the tree, it is overwritten as deleted with the checksum computed
168+ * from the chunk checksums in the BlockData. If we reconciled with a peer and already marked this block as deleted
169+ * during that process, this will overwrite that value. If it changes the block's checksum from what the peer had,
170+ * one more round of reconciliation may be required to bring them in sync.
171+ *
172+ * The top level container data checksum is only computed in the returned tree proto if computeChecksum is true.
173+ * If it is false, the resulting tree proto will have data checksums for each block, but an empty/unset data checksum
174+ * for the container at the root of the tree.
175+ */
176+ public ContainerProtos .ContainerMerkleTree addDeletedBlocks (Collection <BlockData > blocks , boolean computeChecksum ) {
177+ for (BlockData block : blocks ) {
178+ long blockID = block .getLocalID ();
179+ BlockMerkleTreeWriter blockWriter = new BlockMerkleTreeWriter (blockID );
180+ for (ContainerProtos .ChunkInfo chunkInfo : block .getChunks ()) {
181+ blockWriter .addChunks (new ChunkMerkleTreeWriter (chunkInfo , true ));
182+ }
183+ blockWriter .markDeleted ();
184+ id2Block .put (blockID , blockWriter );
185+ }
186+ ContainerProtos .ContainerMerkleTree .Builder protoBuilder = toProtoBuilder ();
187+ if (!computeChecksum ) {
188+ protoBuilder .clearDataChecksum ();
189+ }
190+ return protoBuilder .build ();
191+ }
192+
114193 /**
115194 * Uses chunk hashes to compute all remaining hashes in the tree, and returns it as a protobuf object. No checksum
116195 * computation for the tree happens outside of this method.
117196 *
118197 * @return A complete protobuf object representation of this tree.
119198 */
120- public ContainerProtos .ContainerMerkleTree toProto () {
199+ private ContainerProtos .ContainerMerkleTree . Builder toProtoBuilder () {
121200 // Compute checksums and return the result.
122201 ContainerProtos .ContainerMerkleTree .Builder containerTreeBuilder = ContainerProtos .ContainerMerkleTree .newBuilder ();
123202 ChecksumByteBuffer checksumImpl = CHECKSUM_BUFFER_SUPPLIER .get ();
@@ -133,8 +212,11 @@ public ContainerProtos.ContainerMerkleTree toProto() {
133212 checksumImpl .update (containerChecksumBuffer );
134213
135214 return containerTreeBuilder
136- .setDataChecksum (checksumImpl .getValue ())
137- .build ();
215+ .setDataChecksum (checksumImpl .getValue ());
216+ }
217+
218+ public ContainerProtos .ContainerMerkleTree toProto () {
219+ return toProtoBuilder ().build ();
138220 }
139221
140222 /**
@@ -145,10 +227,22 @@ private static class BlockMerkleTreeWriter {
145227 // Chunk order in the checksum is determined by their offset.
146228 private final SortedMap <Long , ChunkMerkleTreeWriter > offset2Chunk ;
147229 private final long blockID ;
230+ private boolean deleted ;
231+ private Long dataChecksum ;
148232
149233 BlockMerkleTreeWriter (long blockID ) {
150234 this .blockID = blockID ;
151235 this .offset2Chunk = new TreeMap <>();
236+ this .deleted = false ;
237+ }
238+
239+ public void markDeleted (long deletedDataChecksum ) {
240+ this .deleted = true ;
241+ this .dataChecksum = deletedDataChecksum ;
242+ }
243+
244+ public void markDeleted () {
245+ this .deleted = true ;
152246 }
153247
154248 /**
@@ -163,6 +257,10 @@ public void addChunks(ChunkMerkleTreeWriter... chunks) {
163257 }
164258 }
165259
260+ public boolean isDeleted () {
261+ return deleted ;
262+ }
263+
166264 /**
167265 * Uses chunk hashes to compute a block hash for this tree, and returns it as a protobuf object. All block checksum
168266 * computation for the tree happens within this method.
@@ -171,6 +269,23 @@ public void addChunks(ChunkMerkleTreeWriter... chunks) {
171269 */
172270 public ContainerProtos .BlockMerkleTree toProto () {
173271 ContainerProtos .BlockMerkleTree .Builder blockTreeBuilder = ContainerProtos .BlockMerkleTree .newBuilder ();
272+ if (dataChecksum != null ) {
273+ blockTreeBuilder .setDataChecksum (dataChecksum );
274+ } else {
275+ setDataChecksumFromChunks (blockTreeBuilder );
276+ }
277+
278+ if (deleted ) {
279+ blockTreeBuilder .clearChunkMerkleTree ();
280+ }
281+
282+ return blockTreeBuilder
283+ .setBlockID (blockID )
284+ .setDeleted (deleted )
285+ .build ();
286+ }
287+
288+ private void setDataChecksumFromChunks (ContainerProtos .BlockMerkleTree .Builder blockTreeBuilder ) {
174289 ChecksumByteBuffer checksumImpl = CHECKSUM_BUFFER_SUPPLIER .get ();
175290 // Allocate space for block ID + all chunk checksums
176291 ByteBuffer blockChecksumBuffer = ByteBuffer .allocate (Long .BYTES * (1 + offset2Chunk .size ()));
@@ -189,11 +304,7 @@ public ContainerProtos.BlockMerkleTree toProto() {
189304 }
190305 blockChecksumBuffer .flip ();
191306 checksumImpl .update (blockChecksumBuffer );
192-
193- return blockTreeBuilder
194- .setBlockID (blockID )
195- .setDataChecksum (checksumImpl .getValue ())
196- .build ();
307+ blockTreeBuilder .setDataChecksum (checksumImpl .getValue ());
197308 }
198309 }
199310
0 commit comments