From b1fde6f6bd2b15956859d5896b3006bb5c166e8f Mon Sep 17 00:00:00 2001 From: Sergey Soldatov Date: Wed, 18 Mar 2026 09:25:11 -0700 Subject: [PATCH] HDDS-14752. ReadBlock: Close stream immediately when position >= blockLength --- .../scm/storage/StreamBlockInputStream.java | 14 +- .../storage/TestStreamBlockInputStream.java | 127 +++++++++++++++++- 2 files changed, 132 insertions(+), 9 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java index cac3f75c5d35..e608bd5a126a 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java @@ -131,8 +131,9 @@ public synchronized int read() throws IOException { if (!dataAvailableToRead(1, true)) { return EOF; } - position++; - return buffer.get(); + int value = buffer.get(); + advancePosition(1); + return value; } @Override @@ -158,7 +159,7 @@ synchronized int readFully(ByteBuffer targetBuf, boolean preRead) throws IOExcep tmpBuf.limit(tmpBuf.position() + toCopy); targetBuf.put(tmpBuf); buffer.position(tmpBuf.position()); - position += toCopy; + advancePosition(toCopy); read += toCopy; } return read > 0 ? read : EOF; @@ -177,6 +178,13 @@ private synchronized boolean dataAvailableToRead(int length, boolean preRead) th return bufferHasRemaining(); } + private synchronized void advancePosition(long delta) { + position += delta; + if (position >= blockLength && streamingReader != null) { + closeStream(); + } + } + private synchronized boolean bufferHasRemaining() { return buffer != null && buffer.hasRemaining(); } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java index 568f7a3b918c..2f212ccfe5ff 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java @@ -18,16 +18,38 @@ package org.apache.hadoop.hdds.scm.storage; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.nio.ByteBuffer; import java.time.Duration; +import java.util.Collections; import java.util.function.Function; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.DatanodeID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumData; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.StreamingReadResponse; +import org.apache.hadoop.hdds.scm.StreamingReaderSpi; import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.XceiverClientGrpc; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.security.token.Token; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.junit.jupiter.api.Test; /** @@ -35,16 +57,18 @@ */ public class TestStreamBlockInputStream { + private static final Duration STREAM_READ_TIMEOUT = Duration.ofSeconds(5); + private static final Function NO_REFRESH = b -> null; + @Test public void testCustomStreamReadConfigIsApplied() throws Exception { // Arrange: create a config with non-default values - OzoneClientConfig clientConfig = new OzoneClientConfig(); + OzoneClientConfig clientConfig = newStreamReadConfig(); clientConfig.setStreamReadPreReadSize(64L << 20); clientConfig.setStreamReadResponseDataSize(2 << 20); - clientConfig.setStreamReadTimeout(Duration.ofSeconds(5)); // Sanity check - assertEquals(Duration.ofSeconds(5), clientConfig.getStreamReadTimeout()); + assertEquals(STREAM_READ_TIMEOUT, clientConfig.getStreamReadTimeout()); // Create a dummy BlockID for the test BlockID blockID = new BlockID(1L, 1L); long length = 1024L; @@ -54,16 +78,107 @@ public void testCustomStreamReadConfigIsApplied() throws Exception { Token token = null; // Mock XceiverClientFactory since StreamBlockInputStream requires it in the constructor XceiverClientFactory xceiverClientFactory = mock(XceiverClientFactory.class); - Function refreshFunction = b -> null; // Create a StreamBlockInputStream instance try (StreamBlockInputStream sbis = new StreamBlockInputStream( blockID, length, pipeline, token, - xceiverClientFactory, refreshFunction, clientConfig)) { + xceiverClientFactory, NO_REFRESH, clientConfig)) { // Assert: fields should match config values assertEquals(64L << 20, sbis.getPreReadSize()); assertEquals(2 << 20, sbis.getResponseDataSize()); - assertEquals(Duration.ofSeconds(5), sbis.getReadTimeout()); + assertEquals(STREAM_READ_TIMEOUT, sbis.getReadTimeout()); } } + + @Test + public void testReleasesStreamPermitAtBlockEof() throws Exception { + OzoneClientConfig clientConfig = newStreamReadConfig(); + BlockID blockID = new BlockID(1L, 2L); + byte[] data = new byte[] {1, 2, 3, 4}; + long length = data.length; + Pipeline pipeline = mockStandalonePipeline(); + XceiverClientGrpc xceiverClient = mockStreamingReadClient(data); + XceiverClientFactory xceiverClientFactory = mock(XceiverClientFactory.class); + when(xceiverClientFactory.acquireClientForReadData(any(Pipeline.class))) + .thenReturn(xceiverClient); + + try (StreamBlockInputStream sbis = new StreamBlockInputStream( + blockID, length, pipeline, null, xceiverClientFactory, + NO_REFRESH, clientConfig)) { + ByteBuffer firstRead = ByteBuffer.allocate((int) length - 1); + int first = sbis.read(firstRead); + assertEquals(length - 1, first); + assertEquals(length - 1, sbis.getPos()); + verify(xceiverClient, never()).completeStreamRead(); + + int last = sbis.read(); + assertEquals(data[(int) length - 1] & 0xFF, last); + assertEquals(length, sbis.getPos()); + verify(xceiverClient, times(1)).completeStreamRead(); + + // Subsequent reads should return EOF and must not trigger duplicate permit release. + assertEquals(-1, sbis.read()); + assertEquals(-1, sbis.read()); + } + + verify(xceiverClient, times(1)).completeStreamRead(); + } + + private OzoneClientConfig newStreamReadConfig() { + OzoneClientConfig clientConfig = new OzoneClientConfig(); + clientConfig.setChecksumVerify(false); + clientConfig.setStreamReadPreReadSize(0); + clientConfig.setStreamReadResponseDataSize(1024); + clientConfig.setStreamReadTimeout(STREAM_READ_TIMEOUT); + return clientConfig; + } + + private Pipeline mockStandalonePipeline() throws Exception { + Pipeline pipeline = mock(Pipeline.class); + DatanodeDetails datanode = mock(DatanodeDetails.class); + + when(pipeline.getNodes()).thenReturn(Collections.singletonList(datanode)); + when(pipeline.getNodesInOrder()).thenReturn(Collections.singletonList(datanode)); + when(pipeline.getFirstNode()).thenReturn(datanode); + when(pipeline.getClosestNode()).thenReturn(datanode); + when(pipeline.getType()).thenReturn(HddsProtos.ReplicationType.STAND_ALONE); + when(pipeline.getReplicaIndex(datanode)).thenReturn(1); + when(datanode.getID()).thenReturn(mock(DatanodeID.class)); + when(datanode.getUuidString()).thenReturn("00000000-0000-0000-0000-000000000001"); + + return pipeline; + } + + private XceiverClientGrpc mockStreamingReadClient(byte[] data) throws Exception { + XceiverClientGrpc xceiverClient = mock(XceiverClientGrpc.class); + StreamingReadResponse streamingReadResponse = mock(StreamingReadResponse.class); + ReadBlockResponseProto readBlock = buildReadBlockResponse(data); + + doNothing().when(xceiverClient) + .streamRead(any(ContainerCommandRequestProto.class), + any(StreamingReadResponse.class)); + doAnswer(invocation -> { + StreamingReaderSpi reader = invocation.getArgument(1); + reader.setStreamingReadResponse(streamingReadResponse); + reader.onNext(ContainerCommandResponseProto.newBuilder() + .setCmdType(Type.ReadBlock) + .setResult(ContainerProtos.Result.SUCCESS) + .setReadBlock(readBlock) + .build()); + return null; + }).when(xceiverClient).initStreamRead(any(BlockID.class), any()); + + return xceiverClient; + } + + private ReadBlockResponseProto buildReadBlockResponse(byte[] data) { + return ReadBlockResponseProto.newBuilder() + .setOffset(0) + .setData(ByteString.copyFrom(data)) + .setChecksumData(ChecksumData.newBuilder() + .setType(ContainerProtos.ChecksumType.NONE) + .setBytesPerChecksum(data.length) + .build()) + .build(); + } }