diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index b057efb1ff83..32da41f8f17d 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -505,8 +505,8 @@ StatusAndMessages queryUpgradeFinalizationProgress( long getContainerCount(HddsProtos.LifeCycleState state) throws IOException; - List getListOfContainers( - long startContainerID, int count, HddsProtos.LifeCycleState state) + List getListOfContainerIDs( + ContainerID startContainerID, int count, HddsProtos.LifeCycleState state) throws IOException; DecommissionScmResponseProto decommissionScm( diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 01e3d709cf02..ca9424657700 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -36,6 +36,7 @@ import java.util.Optional; import java.util.UUID; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.hdds.client.ECReplicationConfig; @@ -109,6 +110,8 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerIDsRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerIDsResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SafeModeRuleStatusProto; @@ -1250,10 +1253,30 @@ public void close() { } @Override - public List getListOfContainers( - long startContainerID, int count, HddsProtos.LifeCycleState state) + public List getListOfContainerIDs( + ContainerID startContainerID, int count, HddsProtos.LifeCycleState state) throws IOException { - return listContainer(startContainerID, count, state).getContainerInfoList(); + Preconditions.checkState(startContainerID.getId() >= 0, + "Container ID cannot be negative."); + Preconditions.checkState(count > 0, + "Container count must be greater than 0."); + SCMListContainerIDsRequestProto.Builder builder = SCMListContainerIDsRequestProto + .newBuilder(); + builder.setStartContainerID(startContainerID.getProtobuf()); + builder.setCount(count); + builder.setTraceID(TracingUtil.exportCurrentSpan()); + builder.setState(state); + + SCMListContainerIDsRequestProto request = builder.build(); + + SCMListContainerIDsResponseProto response = + submitRequest(Type.ListContainerIDs, + builder1 -> builder1.setScmListContainerIDsRequest(request)) + .getScmListContainerIDsResponse(); + return response.getContainerIDsList() + .stream() + .map(ContainerID::getFromProtobuf) + .collect(Collectors.toList()); } @Override diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto index 455f048d8b5a..b6508ca9688d 100644 --- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto +++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto @@ -87,6 +87,7 @@ message ScmContainerLocationRequest { optional ContainerBalancerStatusInfoRequestProto containerBalancerStatusInfoRequest = 48; optional ReconcileContainerRequestProto reconcileContainerRequest = 49; optional GetDeletedBlocksTxnSummaryRequestProto getDeletedBlocksTxnSummaryRequest = 50; + optional SCMListContainerIDsRequestProto scmListContainerIDsRequest = 51; } message ScmContainerLocationResponse { @@ -145,6 +146,7 @@ message ScmContainerLocationResponse { optional ContainerBalancerStatusInfoResponseProto containerBalancerStatusInfoResponse = 48; optional ReconcileContainerResponseProto reconcileContainerResponse = 49; optional GetDeletedBlocksTxnSummaryResponseProto getDeletedBlocksTxnSummaryResponse = 50; + optional SCMListContainerIDsResponseProto scmListContainerIDsResponse = 51; enum Status { OK = 1; @@ -202,6 +204,7 @@ enum Type { GetContainerBalancerStatusInfo = 44; ReconcileContainer = 45; GetDeletedBlocksTransactionSummary = 46; + ListContainerIDs = 47; } /** @@ -291,6 +294,17 @@ message GetExistContainerWithPipelinesInBatchResponseProto { repeated ContainerWithPipeline containerWithPipelines = 1; } +message SCMListContainerIDsRequestProto { + required uint32 count = 1; + optional ContainerID startContainerID = 2; + optional LifeCycleState state = 3; + optional string traceID = 4; +} + +message SCMListContainerIDsResponseProto { + repeated ContainerID containerIDs = 1; +} + message SCMListContainerRequestProto { required uint32 count = 1; optional uint64 startContainerID = 2; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java index 36753202ec91..3c5706cc0fb9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java @@ -55,16 +55,33 @@ default List getContainers() { return getContainers(ContainerID.valueOf(0), Integer.MAX_VALUE); } + /** + * Returns container IDs under certain conditions. + * Search container IDs from start ID(inclusive), + * The max size of the searching range cannot exceed the + * value of count. + * + * @param startID start containerID, >=0, + * start searching at the head if 0. + * @param count count must be >= 0 + * Usually the count will be replaced with a very big + * value instead of being unlimited in case the db is very big. + * @param state container state + * + * @return a list of container IDs. + */ + List getContainerIDs(ContainerID startID, int count, LifeCycleState state); + /** * Returns containers under certain conditions. - * Search container IDs from start ID(exclusive), + * Search container IDs from start ID(inclusive), * The max size of the searching range cannot exceed the * value of count. * * @param startID start containerID, >=0, * start searching at the head if 0. * @param count count must be >= 0 - * Usually the count will be replace with a very big + * Usually the count will be replaced with a very big * value instead of being unlimited in case the db is very big. * * @return a list of container. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java index f77bf86cec1a..432c9890e98a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java @@ -141,6 +141,14 @@ public List getContainers(ReplicationType type) { return containerStateManager.getContainerInfos(type); } + @Override + public List getContainerIDs(final ContainerID startID, + final int count, + final LifeCycleState state) { + scmContainerManagerMetrics.incNumListContainersOps(); + return containerStateManager.getContainerIDs(state, startID, count); + } + @Override public List getContainers(final ContainerID startID, final int count) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index 3809db1cd335..0d66027480d2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -103,6 +103,15 @@ public interface ContainerStateManager { */ boolean contains(ContainerID containerID); + /** + * Get {@link ContainerID}s for the given state. + * + * @param start the start {@link ContainerID} (inclusive) + * @param count the size limit + * @return a list of {@link ContainerID}; + */ + List getContainerIDs(LifeCycleState state, ContainerID start, int count); + /** * Get {@link ContainerInfo}s. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java index dc5afd43a20b..5c95aff5c190 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java @@ -266,6 +266,13 @@ private void initialize() throws IOException { return actions; } + @Override + public List getContainerIDs(LifeCycleState state, ContainerID start, int count) { + try (AutoCloseableLock ignored = readLock()) { + return containers.getContainerIDs(state, start, count); + } + } + @Override public List getContainerInfos(ContainerID start, int count) { try (AutoCloseableLock ignored = readLock()) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java index b1ff5f4ae488..4dd93aef7473 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java @@ -260,6 +260,20 @@ public void updateState(ContainerID containerID, LifeCycleState currentState, currentInfo.setState(newState); } + /** + * + * @param state the state of the containers + * @param start the start id + * @param count the maximum size of the returned list + * @return a list of sorted {@link ContainerID}s + */ + public List getContainerIDs(LifeCycleState state, ContainerID start, int count) { + Preconditions.assertTrue(count >= 0, "count < 0"); + return lifeCycleStateMap.tailMap(state, start).keySet().stream() + .limit(count) + .collect(Collectors.toList()); + } + public List getContainerInfos(ContainerID start, int count) { return containerMap.getInfos(start, count); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java index 2765cabec039..dd18ad68d13a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -117,6 +117,8 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerIDsRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerIDsResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SafeModeRuleStatusProto; @@ -752,6 +754,12 @@ public ScmContainerLocationResponse processRequest( .setStatus(Status.OK) .setReconcileContainerResponse(reconcileContainer(request.getReconcileContainerRequest())) .build(); + case ListContainerIDs: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setScmListContainerIDsResponse(listContainerIDs(request.getScmListContainerIDsRequest())) + .build(); default: throw new IllegalArgumentException( "Unknown command type: " + request.getCmdType()); @@ -1401,4 +1409,29 @@ public ReconcileContainerResponseProto reconcileContainer(ReconcileContainerRequ return ReconcileContainerResponseProto.getDefaultInstance(); } + public SCMListContainerIDsResponseProto listContainerIDs( + SCMListContainerIDsRequestProto request) throws IOException { + ContainerID startContainerID = ContainerID.valueOf(0); + + if (request.hasStartContainerID()) { + startContainerID = ContainerID.valueOf(request.getStartContainerID().getId()); + } + + HddsProtos.LifeCycleState state = null; + if (request.hasState()) { + state = request.getState(); + } + + SCMListContainerIDsResponseProto.Builder builder = + SCMListContainerIDsResponseProto.newBuilder(); + + List containerIDs = impl.getListOfContainerIDs( + startContainerID, request.getCount(), state); + + containerIDs.stream() + .map(ContainerID::getProtobuf) + .forEach(builder::addContainerIDs); + + return builder.build(); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index f500dc82330c..b161e0e84d76 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -1502,7 +1502,7 @@ public long getContainerCount(HddsProtos.LifeCycleState state) auditMap.put("state", String.valueOf(state)); try { - long count = scm.getContainerManager().getContainers(state).size(); + long count = scm.getContainerManager().getContainerStateCount(state); AUDIT.logReadSuccess(buildAuditMessageForSuccess( SCMAction.GET_CONTAINER_COUNT, auditMap)); return count; @@ -1514,8 +1514,8 @@ public long getContainerCount(HddsProtos.LifeCycleState state) } @Override - public List getListOfContainers( - long startContainerID, int count, HddsProtos.LifeCycleState state) + public List getListOfContainerIDs( + ContainerID startContainerID, int count, HddsProtos.LifeCycleState state) throws IOException { final Map auditMap = Maps.newHashMap(); @@ -1523,14 +1523,14 @@ public List getListOfContainers( auditMap.put("count", String.valueOf(count)); auditMap.put("state", String.valueOf(state)); try { - List results = scm.getContainerManager().getContainers( - ContainerID.valueOf(startContainerID), count, state); + List results = scm.getContainerManager().getContainerIDs( + startContainerID, count, state); AUDIT.logReadSuccess(buildAuditMessageForSuccess( - SCMAction.LIST_CONTAINER, auditMap)); + SCMAction.LIST_CONTAINER_IDS, auditMap)); return results; } catch (Exception ex) { AUDIT.logReadFailure(buildAuditMessageForFailure( - SCMAction.LIST_CONTAINER, auditMap, ex)); + SCMAction.LIST_CONTAINER_IDS, auditMap, ex)); throw ex; } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java index 52cd943c4dbb..b7acb40d7ac8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java @@ -68,7 +68,8 @@ public enum SCMAction implements AuditAction { QUERY_NODE, GET_PIPELINE, RECONCILE_CONTAINER, - GET_DELETED_BLOCK_SUMMARY; + GET_DELETED_BLOCK_SUMMARY, + LIST_CONTAINER_IDS; @Override public String getAction() { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java index 5c3035f28fc7..182a589382a3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java @@ -371,6 +371,37 @@ private void verifyContainerState(ContainerID containerId, assertEquals(expectedState, containerManager.getContainer(containerId).getState()); } + @Test + public void testGetContainerIDs() throws IOException { + ContainerInfo openContainerInfo = new ContainerInfo.Builder() + .setContainerID(1) + .setState(HddsProtos.LifeCycleState.OPEN) + .setSequenceId(100L) + .setOwner("scm") + .setPipelineID(PipelineID.randomId()) + .setReplicationConfig( + RatisReplicationConfig + .getInstance(ReplicationFactor.THREE)) + .build(); + + ContainerInfo closedContainerInfo = new ContainerInfo.Builder() + .setContainerID(2) + .setState(HddsProtos.LifeCycleState.CLOSED) + .setSequenceId(200L) + .setOwner("scm") + .setPipelineID(PipelineID.randomId()) + .setReplicationConfig( + RatisReplicationConfig + .getInstance(ReplicationFactor.THREE)) + .build(); + + containerStateManager.addContainer(openContainerInfo.getProtobuf()); + containerStateManager.addContainer(closedContainerInfo.getProtobuf()); + + assertEquals(1, containerStateManager.getContainerIDs( + HddsProtos.LifeCycleState.CLOSED, ContainerID.MIN, 10).size()); + } + @Test public void testSequenceIdOnStateUpdate() throws Exception { ContainerID containerID = ContainerID.valueOf(3L); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerStateMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerStateMap.java new file mode 100644 index 000000000000..c38c3c211bd9 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerStateMap.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.states; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETED; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.QUASI_CLOSED; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.junit.jupiter.api.Test; + +class TestContainerStateMap { + + @Test + void testGetContainerIDs() { + ContainerStateMap map = new ContainerStateMap(); + + List containerInfos = containerInfos(); + + // initialize map + containerInfos.forEach(map::addContainer); + + assertEquals(4, map.getContainerIDs(OPEN, ContainerID.MIN, containerInfos.size()).size()); + assertEquals(4, map.getContainerIDs(CLOSED, ContainerID.MIN, containerInfos.size()).size()); + + // verify pagination + assertEquals(3, map.getContainerIDs(CLOSED, ContainerID.MIN, 3).size()); + assertEquals(3, map.getContainerIDs(CLOSED, ContainerID.valueOf(7), 3).size()); + } + + private List containerInfos() { + return Arrays.asList( + buildContainerInfo(1, OPEN), + buildContainerInfo(2, CLOSED), + buildContainerInfo(3, QUASI_CLOSED), + buildContainerInfo(4, DELETED), + buildContainerInfo(5, OPEN), + buildContainerInfo(6, OPEN), + buildContainerInfo(7, CLOSED), + buildContainerInfo(8, CLOSED), + buildContainerInfo(9, CLOSED), + buildContainerInfo(10, OPEN) + ); + } + + private ContainerInfo buildContainerInfo(long containerID, HddsProtos.LifeCycleState state) { + return new ContainerInfo.Builder() + .setContainerID(containerID) + .setState(state) + .setReplicationConfig(StandaloneReplicationConfig.getInstance(THREE)) + .build(); + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java index 9402218014ce..7d2f399d1faf 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java @@ -17,10 +17,12 @@ package org.apache.hadoop.hdds.scm.server; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_READONLY_ADMINISTRATORS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -33,6 +35,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.ReconfigurationHandler; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionScmRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionScmResponseProto; import org.apache.hadoop.hdds.scm.HddsTestUtils; @@ -130,12 +133,27 @@ public void testScmListContainer() throws Exception { SCMClientProtocolServer scmServer = new SCMClientProtocolServer(new OzoneConfiguration(), mockStorageContainerManager(), mock(ReconfigurationHandler.class)); + try { + assertEquals(10, scmServer.listContainer(1, 10, + null, HddsProtos.ReplicationType.RATIS, null).getContainerInfoList().size()); + // Test call from a legacy client, which uses a different method of listContainer + assertEquals(10, scmServer.listContainer(1, 10, null, + HddsProtos.ReplicationFactor.THREE).getContainerInfoList().size()); + } finally { + scmServer.stop(); + } + } - assertEquals(10, scmServer.listContainer(1, 10, - null, HddsProtos.ReplicationType.RATIS, null).getContainerInfoList().size()); - // Test call from a legacy client, which uses a different method of listContainer - assertEquals(10, scmServer.listContainer(1, 10, null, - HddsProtos.ReplicationFactor.THREE).getContainerInfoList().size()); + @Test + public void testScmGetContainerCount() throws IOException { + SCMClientProtocolServer scmServer = + new SCMClientProtocolServer(new OzoneConfiguration(), + mockStorageContainerManager(), mock(ReconfigurationHandler.class)); + try { + assertEquals(10, scmServer.getContainerCount(CLOSED)); + } finally { + scmServer.stop(); + } } private StorageContainerManager mockStorageContainerManager() { @@ -145,11 +163,12 @@ private StorageContainerManager mockStorageContainerManager() { } ContainerManagerImpl containerManager = mock(ContainerManagerImpl.class); when(containerManager.getContainers()).thenReturn(infos); + when(containerManager.getContainerStateCount(any(LifeCycleState.class))).thenReturn(infos.size()); StorageContainerManager storageContainerManager = mock(StorageContainerManager.class); when(storageContainerManager.getContainerManager()).thenReturn(containerManager); SCMNodeDetails scmNodeDetails = mock(SCMNodeDetails.class); - when(scmNodeDetails.getClientProtocolServerAddress()).thenReturn(new InetSocketAddress("localhost", 9876)); + when(scmNodeDetails.getClientProtocolServerAddress()).thenReturn(new InetSocketAddress("localhost", 0)); when(scmNodeDetails.getClientProtocolServerAddressKey()).thenReturn("test"); when(storageContainerManager.getScmNodeDetails()).thenReturn(scmNodeDetails); return storageContainerManager; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java index 98ccd8fac8be..be9f1ac00e96 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java @@ -113,6 +113,7 @@ private void addPropertiesNotInXml() { ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY, ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY, ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM, + ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE, OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY, OMConfigKeys.OZONE_OM_HA_PREFIX, OMConfigKeys.OZONE_OM_GRPC_PORT_KEY, diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java index a57095d2e4c7..b4da42d8f03a 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java @@ -221,6 +221,38 @@ public final class ReconServerConfigKeys { "ozone.recon.dn.metrics.collection.timeout"; public static final String OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT_DEFAULT = "10m"; + /** + * Application-level ceiling on the number of ContainerIDs fetched from SCM + * per RPC call during container sync. The effective batch size is + * {@code min(this value, ipc.maximum.data.length / 12, totalContainerCount)}, + * so raising this above the default is only meaningful if + * {@code ipc.maximum.data.length} has also been raised from its default. + * + *

Recon wire cost: each ContainerID is ~12 bytes on the wire, so + * the default 1,000,000 produces ~12 MB per RPC. + * + *

Recon JVM heap: each deserialized {@code ContainerID} object + * occupies ~32 bytes, so the default batch requires ~32 MB of heap on Recon. + * Reduce this value on memory-constrained Recon nodes. + * + *

SCM-side pressure: on each RPC call SCM holds its container + * state read lock (a fair {@link java.util.concurrent.locks.ReentrantReadWriteLock}) + * for the full duration of streaming N entries from its in-memory + * {@link java.util.TreeMap} and collecting them into a response list. + * Because the lock is fair, any concurrent write (container allocation, + * state transition) queuing for the write lock will be blocked for the + * entire batch duration — and new reads queue behind that waiting writer. + * Larger batches therefore increase worst-case container-allocation latency + * on SCM during sync. On write-heavy SCM nodes, prefer smaller batches with + * more calls over fewer large batches. + * + *

Default: 1,000,000 (~12 MB wire, ~32 MB JVM heap per batch on Recon; + * 4 calls for a 4 M-container cluster) + */ + public static final String OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE = + "ozone.recon.scm.container.id.batch.size"; + public static final long OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT = 1_000_000; + /** * Private constructor for utility class. */ diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java index bc6d4943ecdf..278bac0011dc 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java @@ -62,18 +62,15 @@ import org.apache.hadoop.hdds.conf.ReconfigurationHandler; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.ScmUtils; import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler; -import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerReportHandler; import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler; import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory; import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics; import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; @@ -140,9 +137,6 @@ public class ReconStorageContainerManagerFacade private static final Logger LOG = LoggerFactory .getLogger(ReconStorageContainerManagerFacade.class); - public static final long CONTAINER_METADATA_SIZE = 1 * 1024 * 1024L; - private static final String IPC_MAXIMUM_DATA_LENGTH = "ipc.maximum.data.length"; - private static final int IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 128 * 1024 * 1024; private final OzoneConfiguration ozoneConfiguration; private final ReconDatanodeProtocolServer datanodeProtocolServer; @@ -173,6 +167,7 @@ public class ReconStorageContainerManagerFacade private AtomicBoolean isSyncDataFromSCMRunning; private final String threadNamePrefix; + private final ReconStorageContainerSyncHelper containerSyncHelper; // To Do :- Refactor the constructor in a separate JIRA @Inject @@ -385,6 +380,12 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf, reconSafeModeMgrTask = new ReconSafeModeMgrTask( containerManager, nodeManager, safeModeManager, reconTaskConfig, ozoneConfiguration); + + containerSyncHelper = new ReconStorageContainerSyncHelper( + scmServiceProvider, + ozoneConfiguration, + containerManager + ); } /** @@ -566,73 +567,13 @@ public void updateReconSCMDBWithNewSnapshot() throws IOException { } } - public boolean syncWithSCMContainerInfo() - throws IOException { + public boolean syncWithSCMContainerInfo() { if (isSyncDataFromSCMRunning.compareAndSet(false, true)) { - try { - List containers = containerManager.getContainers(); - - long totalContainerCount = scmServiceProvider.getContainerCount( - HddsProtos.LifeCycleState.CLOSED); - long containerCountPerCall = - getContainerCountPerCall(totalContainerCount); - long startContainerId = 1; - long retrievedContainerCount = 0; - if (totalContainerCount > 0) { - while (retrievedContainerCount < totalContainerCount) { - List listOfContainers = scmServiceProvider. - getListOfContainers(startContainerId, - Long.valueOf(containerCountPerCall).intValue(), - HddsProtos.LifeCycleState.CLOSED); - if (null != listOfContainers && !listOfContainers.isEmpty()) { - LOG.info("Got list of containers from SCM : " + - listOfContainers.size()); - listOfContainers.forEach(containerInfo -> { - long containerID = containerInfo.getContainerID(); - boolean isContainerPresentAtRecon = - containers.contains(containerInfo); - if (!isContainerPresentAtRecon) { - try { - ContainerWithPipeline containerWithPipeline = - scmServiceProvider.getContainerWithPipeline( - containerID); - containerManager.addNewContainer(containerWithPipeline); - } catch (IOException e) { - LOG.error("Could not get container with pipeline " + - "for container : {}", containerID); - } - } - }); - startContainerId = listOfContainers.get( - listOfContainers.size() - 1).getContainerID() + 1; - } else { - LOG.info("No containers found at SCM in CLOSED state"); - return false; - } - retrievedContainerCount += containerCountPerCall; - } - } - } catch (IOException e) { - LOG.error("Unable to refresh Recon SCM DB Snapshot. ", e); - return false; - } + return containerSyncHelper.syncWithSCMContainerInfo(); } else { LOG.debug("SCM DB sync is already running."); return false; } - return true; - } - - private long getContainerCountPerCall(long totalContainerCount) { - // Assumption of size of 1 container info object here is 1 MB - long containersMetaDataTotalRpcRespSizeMB = - CONTAINER_METADATA_SIZE * totalContainerCount; - long hadoopRPCSize = ozoneConfiguration.getInt(IPC_MAXIMUM_DATA_LENGTH, IPC_MAXIMUM_DATA_LENGTH_DEFAULT); - long containerCountPerCall = containersMetaDataTotalRpcRespSizeMB <= - hadoopRPCSize ? totalContainerCount : - Math.round(Math.floor( - hadoopRPCSize / (double) CONTAINER_METADATA_SIZE)); - return containerCountPerCall; } private void deleteOldSCMDB() throws IOException { diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java new file mode 100644 index 000000000000..c8d940aa8357 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.scm; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ReconStorageContainerSyncHelper { + + // Serialized size of one ContainerID proto on the wire (varint tag + 8-byte long = ~12 bytes). + // Used to derive the maximum batch size that fits within ipc.maximum.data.length. + private static final long CONTAINER_ID_PROTO_SIZE_BYTES = 12; + + private static final Logger LOG = LoggerFactory + .getLogger(ReconStorageContainerSyncHelper.class); + + private final StorageContainerServiceProvider scmServiceProvider; + private final OzoneConfiguration ozoneConfiguration; + private final ReconContainerManager containerManager; + + ReconStorageContainerSyncHelper(StorageContainerServiceProvider scmServiceProvider, + OzoneConfiguration ozoneConfiguration, + ReconContainerManager containerManager) { + this.scmServiceProvider = scmServiceProvider; + this.ozoneConfiguration = ozoneConfiguration; + this.containerManager = containerManager; + } + + public boolean syncWithSCMContainerInfo() { + try { + long totalContainerCount = scmServiceProvider.getContainerCount( + HddsProtos.LifeCycleState.CLOSED); + long containerCountPerCall = + getContainerCountPerCall(totalContainerCount); + ContainerID startContainerId = ContainerID.valueOf(1); + long retrievedContainerCount = 0; + if (totalContainerCount > 0) { + while (retrievedContainerCount < totalContainerCount) { + List listOfContainers = scmServiceProvider. + getListOfContainerIDs(startContainerId, + Long.valueOf(containerCountPerCall).intValue(), + HddsProtos.LifeCycleState.CLOSED); + if (null != listOfContainers && !listOfContainers.isEmpty()) { + LOG.info("Got list of containers from SCM : {}", listOfContainers.size()); + listOfContainers.forEach(containerID -> { + boolean isContainerPresentAtRecon = containerManager.containerExist(containerID); + if (!isContainerPresentAtRecon) { + try { + ContainerWithPipeline containerWithPipeline = + scmServiceProvider.getContainerWithPipeline( + containerID.getId()); + containerManager.addNewContainer(containerWithPipeline); + } catch (IOException e) { + LOG.error("Could not get container with pipeline " + + "for container : {}", containerID); + } + } + }); + long lastID = listOfContainers.get(listOfContainers.size() - 1).getId(); + startContainerId = ContainerID.valueOf(lastID + 1); + } else { + LOG.info("No containers found at SCM in CLOSED state"); + return false; + } + retrievedContainerCount += containerCountPerCall; + } + } + } catch (Exception e) { + LOG.error("Unable to refresh Recon SCM DB Snapshot. ", e); + return false; + } + return true; + } + + private long getContainerCountPerCall(long totalContainerCount) { + long hadoopRPCSize = ozoneConfiguration.getInt( + IPC_MAXIMUM_DATA_LENGTH, IPC_MAXIMUM_DATA_LENGTH_DEFAULT); + long countByRpcLimit = hadoopRPCSize / CONTAINER_ID_PROTO_SIZE_BYTES; + long countByBatchLimit = ozoneConfiguration.getLong( + OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE, + OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT); + + long batchSize = Math.min(countByRpcLimit, countByBatchLimit); + return Math.min(totalContainerCount, batchSize); + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java index 412bd3027662..9e73c30edb81 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java @@ -20,7 +20,7 @@ import java.io.IOException; import java.util.List; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.utils.db.DBCheckpoint; @@ -80,15 +80,15 @@ List getExistContainerWithPipelinesInBatch( DBCheckpoint getSCMDBSnapshot(); /** - * Get the list of containers from SCM. This is a RPC call. + * Get the list of container IDs from SCM. This is an RPC call. * * @param startContainerID the start container id * @param count the number of containers to return * @param state the containers in given state to be returned - * @return the list of containers from SCM in a given state + * @return the list of container IDs from SCM in a given state * @throws IOException */ - List getListOfContainers(long startContainerID, + List getListOfContainerIDs(ContainerID startContainerID, int count, HddsProtos.LifeCycleState state) throws IOException; diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java index edd1c1f702b8..6d4e31042341 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.ha.InterSCMGrpcClient; import org.apache.hadoop.hdds.scm.ha.SCMSnapshotDownloader; @@ -185,10 +185,9 @@ private RocksDBCheckpoint getRocksDBCheckpoint(String snapshotFileName, File tar } @Override - public List getListOfContainers( - long startContainerID, int count, HddsProtos.LifeCycleState state) + public List getListOfContainerIDs( + ContainerID startContainerID, int count, HddsProtos.LifeCycleState state) throws IOException { - return scmClient.getListOfContainers(startContainerID, count, state); + return scmClient.getListOfContainerIDs(startContainerID, count, state); } - } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconStorageContainerSyncHelper.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconStorageContainerSyncHelper.java new file mode 100644 index 000000000000..9ba0d85a931b --- /dev/null +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconStorageContainerSyncHelper.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.scm; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider; +import org.junit.jupiter.api.Test; + +class TestReconStorageContainerSyncHelper { + + private final StorageContainerServiceProvider mockScmServiceProvider = + mock(StorageContainerServiceProvider.class); + + private final ReconContainerManager mockContainerManager = + mock(ReconContainerManager.class); + + private final ReconStorageContainerSyncHelper syncHelper; + + TestReconStorageContainerSyncHelper() { + syncHelper = new ReconStorageContainerSyncHelper( + mockScmServiceProvider, + new OzoneConfiguration(), + mockContainerManager + ); + } + + @Test + void testContainerMissingFromReconIsAdded() throws Exception { + ContainerID cid = ContainerID.valueOf(42L); + ContainerInfo info = new ContainerInfo.Builder() + .setContainerID(42L) + .setState(CLOSED) + .setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE)) + .setOwner("test") + .build(); + ContainerWithPipeline cwp = new ContainerWithPipeline(info, null); + + when(mockScmServiceProvider.getContainerCount(CLOSED)).thenReturn(1L); + when(mockScmServiceProvider.getListOfContainerIDs( + eq(ContainerID.valueOf(1L)), eq(1), eq(CLOSED))) + .thenReturn(Collections.singletonList(cid)); + when(mockContainerManager.containerExist(cid)).thenReturn(false); + when(mockScmServiceProvider.getContainerWithPipeline(42L)).thenReturn(cwp); + + boolean result = syncHelper.syncWithSCMContainerInfo(); + + assertTrue(result); + verify(mockScmServiceProvider).getContainerWithPipeline(42L); + verify(mockContainerManager).addNewContainer(cwp); + } + + @Test + void testContainerMissingFromReconIsAddedWhenMultiplePages() throws Exception { + // Force containerCountPerCall = 2 via the batch size config + OzoneConfiguration pagedConf = new OzoneConfiguration(); + pagedConf.setLong(OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE, 2L); + ReconStorageContainerSyncHelper pagedHelper = new ReconStorageContainerSyncHelper( + mockScmServiceProvider, pagedConf, mockContainerManager); + + // Page 1: containers 1 and 2 (both missing from Recon) + ContainerID cid1 = ContainerID.valueOf(1L); + ContainerID cid2 = ContainerID.valueOf(2L); + ContainerInfo info1 = new ContainerInfo.Builder() + .setContainerID(1L).setState(CLOSED) + .setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE)) + .setOwner("test").build(); + ContainerInfo info2 = new ContainerInfo.Builder() + .setContainerID(2L).setState(CLOSED) + .setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE)) + .setOwner("test").build(); + ContainerWithPipeline cwp1 = new ContainerWithPipeline(info1, null); + ContainerWithPipeline cwp2 = new ContainerWithPipeline(info2, null); + + // Page 2: container 3 (already in Recon) + ContainerID cid3 = ContainerID.valueOf(3L); + + when(mockScmServiceProvider.getContainerCount(CLOSED)).thenReturn(3L); + when(mockScmServiceProvider.getListOfContainerIDs( + eq(ContainerID.valueOf(1L)), eq(2), eq(CLOSED))) + .thenReturn(Arrays.asList(cid1, cid2)); + when(mockScmServiceProvider.getListOfContainerIDs( + eq(ContainerID.valueOf(3L)), eq(2), eq(CLOSED))) + .thenReturn(Collections.singletonList(cid3)); + + when(mockContainerManager.containerExist(cid1)).thenReturn(false); + when(mockContainerManager.containerExist(cid2)).thenReturn(false); + when(mockContainerManager.containerExist(cid3)).thenReturn(true); + when(mockScmServiceProvider.getContainerWithPipeline(1L)).thenReturn(cwp1); + when(mockScmServiceProvider.getContainerWithPipeline(2L)).thenReturn(cwp2); + + boolean result = pagedHelper.syncWithSCMContainerInfo(); + + assertTrue(result); + // Page 1: both missing containers were added + verify(mockContainerManager).addNewContainer(cwp1); + verify(mockContainerManager).addNewContainer(cwp2); + // Page 2: present container was skipped + verify(mockContainerManager, never()).addNewContainer( + argThat(cwp -> cwp.getContainerInfo().getContainerID() == 3L)); + // Both pages were fetched + verify(mockScmServiceProvider).getListOfContainerIDs( + eq(ContainerID.valueOf(1L)), eq(2), eq(CLOSED)); + verify(mockScmServiceProvider).getListOfContainerIDs( + eq(ContainerID.valueOf(3L)), eq(2), eq(CLOSED)); + } + + @Test + void testContainerAlreadyInReconIsSkipped() throws Exception { + ContainerID cid = ContainerID.valueOf(7L); + + when(mockScmServiceProvider.getContainerCount(CLOSED)).thenReturn(1L); + when(mockScmServiceProvider.getListOfContainerIDs( + eq(ContainerID.valueOf(1L)), eq(1), eq(CLOSED))) + .thenReturn(Collections.singletonList(cid)); + when(mockContainerManager.containerExist(cid)).thenReturn(true); + + boolean result = syncHelper.syncWithSCMContainerInfo(); + + assertTrue(result); + verify(mockScmServiceProvider, never()).getContainerWithPipeline(anyLong()); + verify(mockContainerManager, never()).addNewContainer(any()); + } + + @Test + void testZeroClosedContainersReturnsTrue() throws Exception { + when(mockScmServiceProvider.getContainerCount(CLOSED)).thenReturn(0L); + + boolean result = syncHelper.syncWithSCMContainerInfo(); + + assertTrue(result); + verifyNoInteractions(mockContainerManager); + verify(mockScmServiceProvider, never()) + .getListOfContainerIDs(any(), any(Integer.class), any()); + } + + @Test + void testEmptyListFromSCMReturnsFalse() throws Exception { + when(mockScmServiceProvider.getContainerCount(CLOSED)).thenReturn(1L); + when(mockScmServiceProvider.getListOfContainerIDs( + eq(ContainerID.valueOf(1L)), eq(1), eq(CLOSED))) + .thenReturn(Collections.emptyList()); + + boolean result = syncHelper.syncWithSCMContainerInfo(); + + assertFalse(result); + verifyNoInteractions(mockContainerManager); + } + +}