From 1ed200e2d87ffcad2b282f2af6c5cc3c46d73350 Mon Sep 17 00:00:00 2001 From: Jason O'Sullivan Date: Fri, 27 Feb 2026 10:51:12 +0000 Subject: [PATCH 01/10] HDDS-14730. Using container ids for recon sync --- .../StorageContainerLocationProtocol.java | 4 ++ ...ocationProtocolClientSideTranslatorPB.java | 31 +++++++++++++ .../src/main/proto/ScmAdminProtocol.proto | 14 ++++++ .../hdds/scm/container/ContainerManager.java | 37 +++++++++++++++ .../scm/container/ContainerManagerImpl.java | 15 ++++++ .../scm/container/ContainerStateManager.java | 18 ++++++++ .../container/ContainerStateManagerImpl.java | 14 ++++++ .../container/states/ContainerStateMap.java | 26 +++++++++++ ...ocationProtocolServerSideTranslatorPB.java | 46 ++++++++++++++++--- .../scm/server/SCMClientProtocolServer.java | 22 +++++++++ .../ReconStorageContainerManagerFacade.java | 21 ++++----- .../spi/StorageContainerServiceProvider.java | 15 ++++++ .../StorageContainerServiceProviderImpl.java | 8 ++++ 13 files changed, 253 insertions(+), 18 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index 1f0f8cd8b066..2a824fb5c800 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -483,6 +483,10 @@ StatusAndMessages queryUpgradeFinalizationProgress( long getContainerCount(HddsProtos.LifeCycleState state) throws IOException; + List getListOfContainerIDs( + ContainerID startContainerID, int count, HddsProtos.LifeCycleState state) + throws IOException; + List getListOfContainers( long startContainerID, int count, HddsProtos.LifeCycleState state) throws IOException; diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 7072d6090baa..9f1ff4443bc1 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -27,12 +27,14 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.hdds.client.ECReplicationConfig; @@ -106,6 +108,8 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerIDsRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerIDsResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SafeModeRuleStatusProto; @@ -1199,6 +1203,33 @@ public void close() { RPC.stopProxy(rpcProxy); } + @Override + public List getListOfContainerIDs( + ContainerID startContainerID, int count, HddsProtos.LifeCycleState state) + throws IOException { + Preconditions.checkState(startContainerID.getId() >= 0, + "Container ID cannot be negative."); + Preconditions.checkState(count > 0, + "Container count must be greater than 0."); + SCMListContainerIDsRequestProto.Builder builder = SCMListContainerIDsRequestProto + .newBuilder(); + builder.setStartContainerID(startContainerID.getProtobuf()); + builder.setCount(count); + builder.setTraceID(TracingUtil.exportCurrentSpan()); + builder.setState(state); + + SCMListContainerIDsRequestProto request = builder.build(); + + SCMListContainerIDsResponseProto response = + submitRequest(Type.ListContainerIDs, + builder1 -> builder1.setScmListContainerIDsRequest(request)) + .getScmListContainerIDsResponse(); + return response.getContainerIDsList() + .stream() + .map(ContainerID::getFromProtobuf) + .collect(Collectors.toList()); + } + @Override public List getListOfContainers( long startContainerID, int count, HddsProtos.LifeCycleState state) diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto index cadff023a061..85ed13411288 100644 --- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto +++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto @@ -86,6 +86,7 @@ message ScmContainerLocationRequest { optional GetMetricsRequestProto getMetricsRequest = 47; optional ContainerBalancerStatusInfoRequestProto containerBalancerStatusInfoRequest = 48; optional ReconcileContainerRequestProto reconcileContainerRequest = 49; + optional SCMListContainerIDsRequestProto scmListContainerIDsRequest = 50; } message ScmContainerLocationResponse { @@ -143,6 +144,7 @@ message ScmContainerLocationResponse { optional GetMetricsResponseProto getMetricsResponse = 47; optional ContainerBalancerStatusInfoResponseProto containerBalancerStatusInfoResponse = 48; optional ReconcileContainerResponseProto reconcileContainerResponse = 49; + optional SCMListContainerIDsResponseProto scmListContainerIDsResponse = 50; enum Status { OK = 1; @@ -199,6 +201,7 @@ enum Type { GetMetrics = 43; GetContainerBalancerStatusInfo = 44; ReconcileContainer = 45; + ListContainerIDs = 46; } /** @@ -288,6 +291,17 @@ message GetExistContainerWithPipelinesInBatchResponseProto { repeated ContainerWithPipeline containerWithPipelines = 1; } +message SCMListContainerIDsRequestProto { + required uint32 count = 1; + optional ContainerID startContainerID = 2; + optional LifeCycleState state = 3; + optional string traceID = 4; +} + +message SCMListContainerIDsResponseProto { + repeated ContainerID containerIDs = 1; +} + message SCMListContainerRequestProto { required uint32 count = 1; optional uint64 startContainerID = 2; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java index fb349720d234..4de63575c6ef 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java @@ -55,6 +55,43 @@ default List getContainers() { return getContainers(ContainerID.valueOf(0), Integer.MAX_VALUE); } + default List getContainerIDs() { + return getContainerIDs(ContainerID.valueOf(0), Integer.MAX_VALUE); + } + + /** + * Returns container IDs under certain conditions. + * Search container IDs from start ID(exclusive), + * The max size of the searching range cannot exceed the + * value of count. + * + * @param startID start containerID, >=0, + * start searching at the head if 0. + * @param count count must be >= 0 + * Usually the count will be replaced with a very big + * value instead of being unlimited in case the db is very big. + * + * @return a list of container IDs. + */ + List getContainerIDs(ContainerID startID, int count); + + /** + * Returns container IDs under certain conditions. + * Search container IDs from start ID(exclusive), + * The max size of the searching range cannot exceed the + * value of count. + * + * @param startID start containerID, >=0, + * start searching at the head if 0. + * @param count count must be >= 0 + * Usually the count will be replaced with a very big + * value instead of being unlimited in case the db is very big. + * @param state container state + * + * @return a list of container IDs. + */ + List getContainerIDs(ContainerID startID, int count, LifeCycleState state); + /** * Returns containers under certain conditions. * Search container IDs from start ID(exclusive), diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java index d255bc9a672d..b5374bb77e89 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java @@ -147,6 +147,21 @@ public List getContainers(ReplicationType type) { return containerStateManager.getContainerInfos(type); } + @Override + public List getContainerIDs(final ContainerID startID, + final int count) { + scmContainerManagerMetrics.incNumListContainersOps(); + return containerStateManager.getContainerIDs(startID, count); + } + + @Override + public List getContainerIDs(final ContainerID startID, + final int count, + final LifeCycleState state) { + scmContainerManagerMetrics.incNumListContainersOps(); + return containerStateManager.getContainerIDs(state, startID, count); + } + @Override public List getContainers(final ContainerID startID, final int count) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index 0194e65fe00a..c3f9bcd7240a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -103,6 +103,24 @@ public interface ContainerStateManager { */ boolean contains(ContainerID containerID); + /** + * Get {@link ContainerID}s. + * + * @param start the start {@link ContainerID} (inclusive) + * @param count the size limit + * @return a list of {@link ContainerID}; + */ + List getContainerIDs(ContainerID start, int count); + + /** + * Get {@link ContainerID}s for the given state. + * + * @param start the start {@link ContainerID} (inclusive) + * @param count the size limit + * @return a list of {@link ContainerID}; + */ + List getContainerIDs(LifeCycleState state, ContainerID start, int count); + /** * Get {@link ContainerInfo}s. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java index 4b4578894a61..5adbe19223f2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java @@ -265,6 +265,20 @@ private void initialize() throws IOException { return actions; } + @Override + public List getContainerIDs(ContainerID start, int count) { + try (AutoCloseableLock ignored = readLock()) { + return containers.getContainerIDs(start, count); + } + } + + @Override + public List getContainerIDs(LifeCycleState state, ContainerID start, int count) { + try (AutoCloseableLock ignored = readLock()) { + return containers.getContainerIDs(state, start, count); + } + } + @Override public List getContainerInfos(ContainerID start, int count) { try (AutoCloseableLock ignored = readLock()) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java index b1ff5f4ae488..c66daf9dcc46 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java @@ -109,6 +109,14 @@ ContainerInfo getInfo(ContainerID id) { return entry == null ? null : entry.getInfo(); } + List getContainerIDs(ContainerID start, int count) { + Objects.requireNonNull(start, "start == null"); + Preconditions.assertTrue(count >= 0, "count < 0"); + return map.tailMap(start).keySet().stream() + .limit(count) + .collect(Collectors.toList()); + } + List getInfos(ContainerID start, int count) { Objects.requireNonNull(start, "start == null"); Preconditions.assertTrue(count >= 0, "count < 0"); @@ -260,6 +268,24 @@ public void updateState(ContainerID containerID, LifeCycleState currentState, currentInfo.setState(newState); } + public List getContainerIDs(ContainerID start, int count) { + return containerMap.getContainerIDs(start, count); + } + + /** + * + * @param state the state of the containers + * @param start the start id + * @param count the maximum size of the returned list + * @return a list of sorted {@link ContainerID}s + */ + public List getContainerIDs(LifeCycleState state, ContainerID start, int count) { + Preconditions.assertTrue(count >= 0, "count < 0"); + return lifeCycleStateMap.tailMap(state, start).keySet().stream() + .limit(count) + .collect(Collectors.toList()); + } + public List getContainerInfos(ContainerID start, int count) { return containerMap.getInfos(start, count); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java index 8ce5ae44ab6b..c63269e2ca5a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -17,10 +17,16 @@ package org.apache.hadoop.hdds.scm.protocol; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerBalancerStatusInfoRequestProto; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoRequestProto; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerCountRequestProto; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryRequestProto; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto.Error.errorPipelineAlreadyExists; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto.Error.success; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto.Status.CONTAINER_ALREADY_CLOSED; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto.Status.CONTAINER_ALREADY_CLOSING; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerIDsRequestProto; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.Type.GetContainer; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.Type.GetContainerWithPipeline; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.Type.GetContainerWithPipelineBatch; @@ -47,7 +53,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.TransferLeadershipRequestProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.TransferLeadershipResponseProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.UpgradeFinalizationStatus; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto; @@ -115,6 +120,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerIDsResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SafeModeRuleStatusProto; @@ -739,6 +745,12 @@ public ScmContainerLocationResponse processRequest( .setStatus(Status.OK) .setReconcileContainerResponse(reconcileContainer(request.getReconcileContainerRequest())) .build(); + case ListContainerIDs: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setScmListContainerIDsResponse(listContainerIDs(request.getScmListContainerIDsRequest())) + .build(); default: throw new IllegalArgumentException( "Unknown command type: " + request.getCmdType()); @@ -895,7 +907,7 @@ public SCMDeleteContainerResponseProto deleteContainer( } public NodeQueryResponseProto queryNode( - StorageContainerLocationProtocolProtos.NodeQueryRequestProto request, + NodeQueryRequestProto request, int clientVersion) throws IOException { HddsProtos.NodeOperationalState opState = null; @@ -945,7 +957,7 @@ public SCMCloseContainerResponseProto closeContainer( } public PipelineResponseProto allocatePipeline( - StorageContainerLocationProtocolProtos.PipelineRequestProto request, + PipelineRequestProto request, int clientVersion) throws IOException { Pipeline pipeline = impl.createReplicationPipeline( request.getReplicationType(), request.getReplicationFactor(), @@ -1216,7 +1228,7 @@ public ContainerBalancerStatusResponseProto getContainerBalancerStatus( } public ContainerBalancerStatusInfoResponseProto getContainerBalancerStatusInfo( - StorageContainerLocationProtocolProtos.ContainerBalancerStatusInfoRequestProto request) + ContainerBalancerStatusInfoRequestProto request) throws IOException { return impl.getContainerBalancerStatusInfo(); } @@ -1287,7 +1299,7 @@ public StartMaintenanceNodesResponseProto startMaintenanceNodes( } public DatanodeUsageInfoResponseProto getDatanodeUsageInfo( - StorageContainerLocationProtocolProtos.DatanodeUsageInfoRequestProto + DatanodeUsageInfoRequestProto request, int clientVersion) throws IOException { List infoList; @@ -1306,7 +1318,7 @@ public DatanodeUsageInfoResponseProto getDatanodeUsageInfo( } public GetContainerCountResponseProto getContainerCount( - StorageContainerLocationProtocolProtos.GetContainerCountRequestProto + GetContainerCountRequestProto request) throws IOException { return GetContainerCountResponseProto.newBuilder() @@ -1315,7 +1327,7 @@ public GetContainerCountResponseProto getContainerCount( } public GetContainerCountResponseProto getClosedContainerCount( - StorageContainerLocationProtocolProtos.GetContainerCountRequestProto + GetContainerCountRequestProto request) throws IOException { return GetContainerCountResponseProto.newBuilder() @@ -1363,4 +1375,24 @@ public ReconcileContainerResponseProto reconcileContainer(ReconcileContainerRequ impl.reconcileContainer(request.getContainerID()); return ReconcileContainerResponseProto.getDefaultInstance(); } + + public SCMListContainerIDsResponseProto listContainerIDs(SCMListContainerIDsRequestProto request) throws IOException { + ContainerID startContainerID = ContainerID.valueOf(0); + + if (request.hasStartContainerID()) { + startContainerID = ContainerID.valueOf(request.getStartContainerID().getId()); + } + + SCMListContainerIDsResponseProto.Builder builder = + SCMListContainerIDsResponseProto.newBuilder(); + + List containerIDs = impl.getListOfContainerIDs( + startContainerID, request.getCount(), request.getState()); + + containerIDs.stream() + .map(ContainerID::getProtobuf) + .forEach(builder::addContainerIDs); + + return builder.build(); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 370493b6dc3c..6b8c47650d12 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -1498,6 +1498,28 @@ public long getContainerCount(HddsProtos.LifeCycleState state) } } + @Override + public List getListOfContainerIDs( + ContainerID startContainerID, int count, HddsProtos.LifeCycleState state) + throws IOException { + + final Map auditMap = Maps.newHashMap(); + auditMap.put("startContainerID", String.valueOf(startContainerID)); + auditMap.put("count", String.valueOf(count)); + auditMap.put("state", String.valueOf(state)); + try { + List results = scm.getContainerManager().getContainerIDs( + startContainerID, count, state); + AUDIT.logReadSuccess(buildAuditMessageForSuccess( + SCMAction.LIST_CONTAINER, auditMap)); + return results; + } catch (Exception ex) { + AUDIT.logReadFailure(buildAuditMessageForFailure( + SCMAction.LIST_CONTAINER, auditMap, ex)); + throw ex; + } + } + @Override public List getListOfContainers( long startContainerID, int count, HddsProtos.LifeCycleState state) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java index b6b7e3cf5b41..8a373307b2d4 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java @@ -68,7 +68,7 @@ import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler; -import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerReportHandler; import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler; @@ -536,32 +536,31 @@ public boolean syncWithSCMContainerInfo() throws IOException { if (isSyncDataFromSCMRunning.compareAndSet(false, true)) { try { - List containers = containerManager.getContainers(); + List containerIDs = containerManager.getContainerIDs(); long totalContainerCount = scmServiceProvider.getContainerCount( HddsProtos.LifeCycleState.CLOSED); long containerCountPerCall = getContainerCountPerCall(totalContainerCount); - long startContainerId = 1; + ContainerID startContainerId = ContainerID.valueOf(1); long retrievedContainerCount = 0; if (totalContainerCount > 0) { while (retrievedContainerCount < totalContainerCount) { - List listOfContainers = scmServiceProvider. - getListOfContainers(startContainerId, + List listOfContainers = scmServiceProvider. + getListOfContainerIDs(startContainerId, Long.valueOf(containerCountPerCall).intValue(), HddsProtos.LifeCycleState.CLOSED); if (null != listOfContainers && !listOfContainers.isEmpty()) { LOG.info("Got list of containers from SCM : " + listOfContainers.size()); - listOfContainers.forEach(containerInfo -> { - long containerID = containerInfo.getContainerID(); + listOfContainers.forEach(containerID -> { boolean isContainerPresentAtRecon = - containers.contains(containerInfo); + containerIDs.contains(containerID); if (!isContainerPresentAtRecon) { try { ContainerWithPipeline containerWithPipeline = scmServiceProvider.getContainerWithPipeline( - containerID); + containerID.getId()); containerManager.addNewContainer(containerWithPipeline); } catch (IOException e) { LOG.error("Could not get container with pipeline " + @@ -569,8 +568,8 @@ public boolean syncWithSCMContainerInfo() } } }); - startContainerId = listOfContainers.get( - listOfContainers.size() - 1).getContainerID() + 1; + long lastID = listOfContainers.get(listOfContainers.size() - 1).getId(); + startContainerId = ContainerID.valueOf(lastID + 1); } else { LOG.info("No containers found at SCM in CLOSED state"); return false; diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java index 412bd3027662..6cd15f0c004d 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.List; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -79,6 +80,20 @@ List getExistContainerWithPipelinesInBatch( */ DBCheckpoint getSCMDBSnapshot(); + /** + * Get the list of container IDs from SCM. This is an RPC call. + * + * @param startContainerID the start container id + * @param count the number of containers to return + * @param state the containers in given state to be returned + * @return the list of container IDs from SCM in a given state + * @throws IOException + */ + List getListOfContainerIDs(ContainerID startContainerID, + int count, + HddsProtos.LifeCycleState state) + throws IOException; + /** * Get the list of containers from SCM. This is a RPC call. * diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java index 3b6164447b3c..8a46c85ddc20 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.ha.InterSCMGrpcClient; @@ -184,6 +185,13 @@ private RocksDBCheckpoint getRocksDBCheckpoint(String snapshotFileName, File tar return new RocksDBCheckpoint(untarredDbDir); } + @Override + public List getListOfContainerIDs( + ContainerID startContainerID, int count, HddsProtos.LifeCycleState state) + throws IOException { + return scmClient.getListOfContainerIDs(startContainerID, count, state); + } + @Override public List getListOfContainers( long startContainerID, int count, HddsProtos.LifeCycleState state) From 155d7654a447433e07273d7a4564578d17a74052 Mon Sep 17 00:00:00 2001 From: Jason O'Sullivan Date: Fri, 27 Feb 2026 11:52:45 +0000 Subject: [PATCH 02/10] HDDS-14730. Using container ids for recon sync --- .../StorageContainerLocationProtocolServerSideTranslatorPB.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java index 6d916fa61c6a..e2b19aba8c2f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.TransferLeadershipRequestProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.TransferLeadershipResponseProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.UpgradeFinalizationStatus; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto; From f8c943f8a79165491fd09dcd2fb56a9af5af9f4b Mon Sep 17 00:00:00 2001 From: Jason O'Sullivan Date: Fri, 27 Feb 2026 16:59:20 +0000 Subject: [PATCH 03/10] HDDS-14730. Using container ids for recon sync --- .../container/TestContainerManagerImpl.java | 15 +++++++++ .../container/TestContainerStateManager.java | 32 +++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java index daf0b4b4c6a8..bb0d77e6f8aa 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java @@ -276,6 +276,21 @@ void testTransitionContainerToClosedStateAllowOnlyDeletingOrDeletedContainers() assertThrows(IOException.class, () -> containerManager.transitionDeletingOrDeletedToClosedState(ecCid)); } + @Test + void testGetContainerIds() throws IOException { + assertEquals(emptyList(), containerManager.getContainerIDs()); + + List ids = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + ContainerInfo container = containerManager.allocateContainer( + RatisReplicationConfig.getInstance(ReplicationFactor.THREE), "admin"); + ids.add(container.containerID()); + } + + assertEquals(ids, containerManager.getContainerIDs(ContainerID.MIN, 10)); + assertEquals(ids.subList(0, 5), containerManager.getContainerIDs(ContainerID.MIN, 5)); + } + @Test void testGetContainers() throws Exception { assertEquals(emptyList(), containerManager.getContainers()); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java index 869b7dd5bb9d..4c88bad80779 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java @@ -370,6 +370,38 @@ private void verifyContainerState(ContainerID containerId, assertEquals(expectedState, containerManager.getContainer(containerId).getState()); } + @Test + public void testGetContainerIDs() throws IOException { + ContainerInfo openContainerInfo = new ContainerInfo.Builder() + .setContainerID(1) + .setState(HddsProtos.LifeCycleState.OPEN) + .setSequenceId(100L) + .setOwner("scm") + .setPipelineID(PipelineID.randomId()) + .setReplicationConfig( + RatisReplicationConfig + .getInstance(ReplicationFactor.THREE)) + .build(); + + ContainerInfo closedContainerInfo = new ContainerInfo.Builder() + .setContainerID(2) + .setState(HddsProtos.LifeCycleState.CLOSED) + .setSequenceId(200L) + .setOwner("scm") + .setPipelineID(PipelineID.randomId()) + .setReplicationConfig( + RatisReplicationConfig + .getInstance(ReplicationFactor.THREE)) + .build(); + + containerStateManager.addContainer(openContainerInfo.getProtobuf()); + containerStateManager.addContainer(closedContainerInfo.getProtobuf()); + + assertEquals(2, containerStateManager.getContainerIDs(ContainerID.MIN, 10).size()); + assertEquals(1, containerStateManager.getContainerIDs( + HddsProtos.LifeCycleState.CLOSED, ContainerID.MIN, 10).size()); + } + @Test public void testSequenceIdOnStateUpdate() throws Exception { ContainerID containerID = ContainerID.valueOf(3L); From 47191c5bbd887532da64220e53468bb626bc622d Mon Sep 17 00:00:00 2001 From: Jason O'Sullivan Date: Mon, 2 Mar 2026 15:58:25 +0000 Subject: [PATCH 04/10] HDDS-14730. Using container ids for recon sync --- .../StorageContainerLocationProtocol.java | 4 - ...ocationProtocolClientSideTranslatorPB.java | 7 - .../hdds/scm/container/ContainerManager.java | 22 +-- .../scm/container/ContainerManagerImpl.java | 7 - .../scm/container/ContainerStateManager.java | 9 -- .../container/ContainerStateManagerImpl.java | 7 - .../container/states/ContainerStateMap.java | 12 -- ...ocationProtocolServerSideTranslatorPB.java | 29 ++-- .../scm/server/SCMClientProtocolServer.java | 26 +--- .../apache/hadoop/ozone/audit/SCMAction.java | 3 +- .../container/TestContainerManagerImpl.java | 15 --- .../container/TestContainerStateManager.java | 1 - .../states/TestContainerStateMap.java | 79 +++++++++++ .../ReconStorageContainerManagerFacade.java | 72 ++-------- .../scm/ReconStorageContainerSyncHelper.java | 111 +++++++++++++++ .../spi/StorageContainerServiceProvider.java | 15 --- .../StorageContainerServiceProviderImpl.java | 9 -- .../TestReconStorageContainerSyncHelper.java | 127 ++++++++++++++++++ 18 files changed, 346 insertions(+), 209 deletions(-) create mode 100644 hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerStateMap.java create mode 100644 hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java create mode 100644 hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconStorageContainerSyncHelper.java diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index 92db9468dd34..fc7d80fde7b6 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -508,10 +508,6 @@ List getListOfContainerIDs( ContainerID startContainerID, int count, HddsProtos.LifeCycleState state) throws IOException; - List getListOfContainers( - long startContainerID, int count, HddsProtos.LifeCycleState state) - throws IOException; - DecommissionScmResponseProto decommissionScm( String scmId) throws IOException; diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index bd4a584882d6..eb9d005f9043 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -1273,13 +1273,6 @@ public List getListOfContainerIDs( .collect(Collectors.toList()); } - @Override - public List getListOfContainers( - long startContainerID, int count, HddsProtos.LifeCycleState state) - throws IOException { - return listContainer(startContainerID, count, state).getContainerInfoList(); - } - @Override public DecommissionScmResponseProto decommissionScm( String scmId) throws IOException { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java index 55d55d3cfb32..88d4831ee4f1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java @@ -55,29 +55,9 @@ default List getContainers() { return getContainers(ContainerID.valueOf(0), Integer.MAX_VALUE); } - default List getContainerIDs() { - return getContainerIDs(ContainerID.valueOf(0), Integer.MAX_VALUE); - } - - /** - * Returns container IDs under certain conditions. - * Search container IDs from start ID(exclusive), - * The max size of the searching range cannot exceed the - * value of count. - * - * @param startID start containerID, >=0, - * start searching at the head if 0. - * @param count count must be >= 0 - * Usually the count will be replaced with a very big - * value instead of being unlimited in case the db is very big. - * - * @return a list of container IDs. - */ - List getContainerIDs(ContainerID startID, int count); - /** * Returns container IDs under certain conditions. - * Search container IDs from start ID(exclusive), + * Search container IDs from start ID(inclusive), * The max size of the searching range cannot exceed the * value of count. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java index 84fdd3611c97..53c8019d7037 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java @@ -141,13 +141,6 @@ public List getContainers(ReplicationType type) { return containerStateManager.getContainerInfos(type); } - @Override - public List getContainerIDs(final ContainerID startID, - final int count) { - scmContainerManagerMetrics.incNumListContainersOps(); - return containerStateManager.getContainerIDs(startID, count); - } - @Override public List getContainerIDs(final ContainerID startID, final int count, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index 715e11a125f9..8037122676ce 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -103,15 +103,6 @@ public interface ContainerStateManager { */ boolean contains(ContainerID containerID); - /** - * Get {@link ContainerID}s. - * - * @param start the start {@link ContainerID} (inclusive) - * @param count the size limit - * @return a list of {@link ContainerID}; - */ - List getContainerIDs(ContainerID start, int count); - /** * Get {@link ContainerID}s for the given state. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java index 4acfaec04034..206debf9ff41 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java @@ -265,13 +265,6 @@ private void initialize() throws IOException { return actions; } - @Override - public List getContainerIDs(ContainerID start, int count) { - try (AutoCloseableLock ignored = readLock()) { - return containers.getContainerIDs(start, count); - } - } - @Override public List getContainerIDs(LifeCycleState state, ContainerID start, int count) { try (AutoCloseableLock ignored = readLock()) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java index c66daf9dcc46..4dd93aef7473 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java @@ -109,14 +109,6 @@ ContainerInfo getInfo(ContainerID id) { return entry == null ? null : entry.getInfo(); } - List getContainerIDs(ContainerID start, int count) { - Objects.requireNonNull(start, "start == null"); - Preconditions.assertTrue(count >= 0, "count < 0"); - return map.tailMap(start).keySet().stream() - .limit(count) - .collect(Collectors.toList()); - } - List getInfos(ContainerID start, int count) { Objects.requireNonNull(start, "start == null"); Preconditions.assertTrue(count >= 0, "count < 0"); @@ -268,10 +260,6 @@ public void updateState(ContainerID containerID, LifeCycleState currentState, currentInfo.setState(newState); } - public List getContainerIDs(ContainerID start, int count) { - return containerMap.getContainerIDs(start, count); - } - /** * * @param state the state of the containers diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java index e2b19aba8c2f..37956eb38014 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -17,16 +17,10 @@ package org.apache.hadoop.hdds.scm.protocol; -import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerBalancerStatusInfoRequestProto; -import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoRequestProto; -import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerCountRequestProto; -import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryRequestProto; -import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto.Error.errorPipelineAlreadyExists; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto.Error.success; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto.Status.CONTAINER_ALREADY_CLOSED; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto.Status.CONTAINER_ALREADY_CLOSING; -import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerIDsRequestProto; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.Type.GetContainer; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.Type.GetContainerWithPipeline; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.Type.GetContainerWithPipelineBatch; @@ -123,6 +117,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerIDsRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerIDsResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto; @@ -921,7 +916,7 @@ public SCMDeleteContainerResponseProto deleteContainer( } public NodeQueryResponseProto queryNode( - NodeQueryRequestProto request, + StorageContainerLocationProtocolProtos.NodeQueryRequestProto request, int clientVersion) throws IOException { HddsProtos.NodeOperationalState opState = null; @@ -971,7 +966,7 @@ public SCMCloseContainerResponseProto closeContainer( } public PipelineResponseProto allocatePipeline( - PipelineRequestProto request, + StorageContainerLocationProtocolProtos.PipelineRequestProto request, int clientVersion) throws IOException { Pipeline pipeline = impl.createReplicationPipeline( request.getReplicationType(), request.getReplicationFactor(), @@ -1247,7 +1242,7 @@ public ContainerBalancerStatusResponseProto getContainerBalancerStatus( } public ContainerBalancerStatusInfoResponseProto getContainerBalancerStatusInfo( - ContainerBalancerStatusInfoRequestProto request) + StorageContainerLocationProtocolProtos.ContainerBalancerStatusInfoRequestProto request) throws IOException { return impl.getContainerBalancerStatusInfo(); } @@ -1318,7 +1313,7 @@ public StartMaintenanceNodesResponseProto startMaintenanceNodes( } public DatanodeUsageInfoResponseProto getDatanodeUsageInfo( - DatanodeUsageInfoRequestProto + StorageContainerLocationProtocolProtos.DatanodeUsageInfoRequestProto request, int clientVersion) throws IOException { List infoList; @@ -1337,7 +1332,7 @@ public DatanodeUsageInfoResponseProto getDatanodeUsageInfo( } public GetContainerCountResponseProto getContainerCount( - GetContainerCountRequestProto + StorageContainerLocationProtocolProtos.GetContainerCountRequestProto request) throws IOException { return GetContainerCountResponseProto.newBuilder() @@ -1346,7 +1341,7 @@ public GetContainerCountResponseProto getContainerCount( } public GetContainerCountResponseProto getClosedContainerCount( - GetContainerCountRequestProto + StorageContainerLocationProtocolProtos.GetContainerCountRequestProto request) throws IOException { return GetContainerCountResponseProto.newBuilder() @@ -1409,18 +1404,24 @@ public ReconcileContainerResponseProto reconcileContainer(ReconcileContainerRequ return ReconcileContainerResponseProto.getDefaultInstance(); } - public SCMListContainerIDsResponseProto listContainerIDs(SCMListContainerIDsRequestProto request) throws IOException { + public SCMListContainerIDsResponseProto listContainerIDs( + SCMListContainerIDsRequestProto request) throws IOException { ContainerID startContainerID = ContainerID.valueOf(0); if (request.hasStartContainerID()) { startContainerID = ContainerID.valueOf(request.getStartContainerID().getId()); } + HddsProtos.LifeCycleState state = null; + if (request.hasState()) { + state = request.getState(); + } + SCMListContainerIDsResponseProto.Builder builder = SCMListContainerIDsResponseProto.newBuilder(); List containerIDs = impl.getListOfContainerIDs( - startContainerID, request.getCount(), request.getState()); + startContainerID, request.getCount(), state); containerIDs.stream() .map(ContainerID::getProtobuf) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index d6a2e24a45e0..cdd4c3b22da2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -1519,33 +1519,11 @@ public List getListOfContainerIDs( List results = scm.getContainerManager().getContainerIDs( startContainerID, count, state); AUDIT.logReadSuccess(buildAuditMessageForSuccess( - SCMAction.LIST_CONTAINER, auditMap)); + SCMAction.LIST_CONTAINER_IDS, auditMap)); return results; } catch (Exception ex) { AUDIT.logReadFailure(buildAuditMessageForFailure( - SCMAction.LIST_CONTAINER, auditMap, ex)); - throw ex; - } - } - - @Override - public List getListOfContainers( - long startContainerID, int count, HddsProtos.LifeCycleState state) - throws IOException { - - final Map auditMap = Maps.newHashMap(); - auditMap.put("startContainerID", String.valueOf(startContainerID)); - auditMap.put("count", String.valueOf(count)); - auditMap.put("state", String.valueOf(state)); - try { - List results = scm.getContainerManager().getContainers( - ContainerID.valueOf(startContainerID), count, state); - AUDIT.logReadSuccess(buildAuditMessageForSuccess( - SCMAction.LIST_CONTAINER, auditMap)); - return results; - } catch (Exception ex) { - AUDIT.logReadFailure(buildAuditMessageForFailure( - SCMAction.LIST_CONTAINER, auditMap, ex)); + SCMAction.LIST_CONTAINER_IDS, auditMap, ex)); throw ex; } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java index 52cd943c4dbb..b7acb40d7ac8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java @@ -68,7 +68,8 @@ public enum SCMAction implements AuditAction { QUERY_NODE, GET_PIPELINE, RECONCILE_CONTAINER, - GET_DELETED_BLOCK_SUMMARY; + GET_DELETED_BLOCK_SUMMARY, + LIST_CONTAINER_IDS; @Override public String getAction() { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java index bb0d77e6f8aa..daf0b4b4c6a8 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java @@ -276,21 +276,6 @@ void testTransitionContainerToClosedStateAllowOnlyDeletingOrDeletedContainers() assertThrows(IOException.class, () -> containerManager.transitionDeletingOrDeletedToClosedState(ecCid)); } - @Test - void testGetContainerIds() throws IOException { - assertEquals(emptyList(), containerManager.getContainerIDs()); - - List ids = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - ContainerInfo container = containerManager.allocateContainer( - RatisReplicationConfig.getInstance(ReplicationFactor.THREE), "admin"); - ids.add(container.containerID()); - } - - assertEquals(ids, containerManager.getContainerIDs(ContainerID.MIN, 10)); - assertEquals(ids.subList(0, 5), containerManager.getContainerIDs(ContainerID.MIN, 5)); - } - @Test void testGetContainers() throws Exception { assertEquals(emptyList(), containerManager.getContainers()); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java index 4c88bad80779..59fd4685ed62 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java @@ -397,7 +397,6 @@ public void testGetContainerIDs() throws IOException { containerStateManager.addContainer(openContainerInfo.getProtobuf()); containerStateManager.addContainer(closedContainerInfo.getProtobuf()); - assertEquals(2, containerStateManager.getContainerIDs(ContainerID.MIN, 10).size()); assertEquals(1, containerStateManager.getContainerIDs( HddsProtos.LifeCycleState.CLOSED, ContainerID.MIN, 10).size()); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerStateMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerStateMap.java new file mode 100644 index 000000000000..b9fbfbbac032 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerStateMap.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.states; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETED; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.QUASI_CLOSED; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.junit.jupiter.api.Test; + +class TestContainerStateMap { + + @Test + void testGetContainerIDs() { + ContainerStateMap map = new ContainerStateMap(); + + List containerInfos = containerInfos(); + + // initialize map + containerInfos.forEach(map::addContainer); + + assertEquals(4, map.getContainerIDs(OPEN, ContainerID.MIN, containerInfos.size()).size()); + assertEquals(4, map.getContainerIDs(CLOSED, ContainerID.MIN, containerInfos.size()).size()); + + // verify pagination + assertEquals(3, map.getContainerIDs(CLOSED, ContainerID.MIN, 3).size()); + assertEquals(3, map.getContainerIDs(CLOSED, ContainerID.valueOf(7), 3).size()); + } + + private List containerInfos() { + return Arrays.asList( + buildContainerInfo(1, OPEN), + buildContainerInfo(2, CLOSED), + buildContainerInfo(3, QUASI_CLOSED), + buildContainerInfo(4, DELETED), + buildContainerInfo(5, OPEN), + buildContainerInfo(6, OPEN), + buildContainerInfo(7, CLOSED), + buildContainerInfo(8, CLOSED), + buildContainerInfo(9, CLOSED), + buildContainerInfo(10, OPEN) + ); + } + + private ContainerInfo buildContainerInfo(long containerID, HddsProtos.LifeCycleState state) { + return new ContainerInfo.Builder() + .setContainerID(containerID) + .setState(state) + .setReplicationConfig(StandaloneReplicationConfig.getInstance(THREE)) + .build(); + } + + + +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java index ef0fb3381dd6..87503c90ab81 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java @@ -62,18 +62,15 @@ import org.apache.hadoop.hdds.conf.ReconfigurationHandler; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.ScmUtils; import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler; -import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerReportHandler; import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler; import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory; import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics; import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; @@ -170,6 +167,7 @@ public class ReconStorageContainerManagerFacade private AtomicBoolean isSyncDataFromSCMRunning; private final String threadNamePrefix; + private final ReconStorageContainerSyncHelper containerSyncHelper; // To Do :- Refactor the constructor in a separate JIRA @Inject @@ -352,6 +350,12 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf, reconSafeModeMgrTask = new ReconSafeModeMgrTask( containerManager, nodeManager, safeModeManager, reconTaskConfig, ozoneConfiguration); + + containerSyncHelper = new ReconStorageContainerSyncHelper( + scmServiceProvider, + ozoneConfiguration, + containerManager + ); } /** @@ -534,71 +538,13 @@ public void updateReconSCMDBWithNewSnapshot() throws IOException { } public boolean syncWithSCMContainerInfo() - throws IOException { + throws Exception { if (isSyncDataFromSCMRunning.compareAndSet(false, true)) { - try { - List containerIDs = containerManager.getContainerIDs(); - - long totalContainerCount = scmServiceProvider.getContainerCount( - HddsProtos.LifeCycleState.CLOSED); - long containerCountPerCall = - getContainerCountPerCall(totalContainerCount); - ContainerID startContainerId = ContainerID.valueOf(1); - long retrievedContainerCount = 0; - if (totalContainerCount > 0) { - while (retrievedContainerCount < totalContainerCount) { - List listOfContainers = scmServiceProvider. - getListOfContainerIDs(startContainerId, - Long.valueOf(containerCountPerCall).intValue(), - HddsProtos.LifeCycleState.CLOSED); - if (null != listOfContainers && !listOfContainers.isEmpty()) { - LOG.info("Got list of containers from SCM : " + - listOfContainers.size()); - listOfContainers.forEach(containerID -> { - boolean isContainerPresentAtRecon = - containerIDs.contains(containerID); - if (!isContainerPresentAtRecon) { - try { - ContainerWithPipeline containerWithPipeline = - scmServiceProvider.getContainerWithPipeline( - containerID.getId()); - containerManager.addNewContainer(containerWithPipeline); - } catch (IOException e) { - LOG.error("Could not get container with pipeline " + - "for container : {}", containerID); - } - } - }); - long lastID = listOfContainers.get(listOfContainers.size() - 1).getId(); - startContainerId = ContainerID.valueOf(lastID + 1); - } else { - LOG.info("No containers found at SCM in CLOSED state"); - return false; - } - retrievedContainerCount += containerCountPerCall; - } - } - } catch (IOException e) { - LOG.error("Unable to refresh Recon SCM DB Snapshot. ", e); - return false; - } + return containerSyncHelper.syncWithSCMContainerInfo(); } else { LOG.debug("SCM DB sync is already running."); return false; } - return true; - } - - private long getContainerCountPerCall(long totalContainerCount) { - // Assumption of size of 1 container info object here is 1 MB - long containersMetaDataTotalRpcRespSizeMB = - CONTAINER_METADATA_SIZE * totalContainerCount; - long hadoopRPCSize = ozoneConfiguration.getInt(IPC_MAXIMUM_DATA_LENGTH, IPC_MAXIMUM_DATA_LENGTH_DEFAULT); - long containerCountPerCall = containersMetaDataTotalRpcRespSizeMB <= - hadoopRPCSize ? totalContainerCount : - Math.round(Math.floor( - hadoopRPCSize / (double) CONTAINER_METADATA_SIZE)); - return containerCountPerCall; } private void deleteOldSCMDB() throws IOException { diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java new file mode 100644 index 000000000000..7dce78876e88 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.scm; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT; +import static org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade.CONTAINER_METADATA_SIZE; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ReconStorageContainerSyncHelper { + + private static final Logger LOG = LoggerFactory + .getLogger(ReconStorageContainerSyncHelper.class); + + private final StorageContainerServiceProvider scmServiceProvider; + private final OzoneConfiguration ozoneConfiguration; + private final ReconContainerManager containerManager; + + ReconStorageContainerSyncHelper(StorageContainerServiceProvider scmServiceProvider, + OzoneConfiguration ozoneConfiguration, + ReconContainerManager containerManager) { + this.scmServiceProvider = scmServiceProvider; + this.ozoneConfiguration = ozoneConfiguration; + this.containerManager = containerManager; + } + + public boolean syncWithSCMContainerInfo() throws Exception { + try { + long totalContainerCount = scmServiceProvider.getContainerCount( + HddsProtos.LifeCycleState.CLOSED); + long containerCountPerCall = + getContainerCountPerCall(totalContainerCount); + ContainerID startContainerId = ContainerID.valueOf(1); + long retrievedContainerCount = 0; + if (totalContainerCount > 0) { + while (retrievedContainerCount < totalContainerCount) { + List listOfContainers = scmServiceProvider. + getListOfContainerIDs(startContainerId, + Long.valueOf(containerCountPerCall).intValue(), + HddsProtos.LifeCycleState.CLOSED); + if (null != listOfContainers && !listOfContainers.isEmpty()) { + LOG.info("Got list of containers from SCM : {}", listOfContainers.size()); + listOfContainers.forEach(containerID -> { + boolean isContainerPresentAtRecon = containerManager.containerExist(containerID); + if (!isContainerPresentAtRecon) { + try { + ContainerWithPipeline containerWithPipeline = + scmServiceProvider.getContainerWithPipeline( + containerID.getId()); + containerManager.addNewContainer(containerWithPipeline); + } catch (IOException e) { + LOG.error("Could not get container with pipeline " + + "for container : {}", containerID); + } + } + }); + long lastID = listOfContainers.get(listOfContainers.size() - 1).getId(); + startContainerId = ContainerID.valueOf(lastID + 1); + } else { + LOG.info("No containers found at SCM in CLOSED state"); + return false; + } + retrievedContainerCount += containerCountPerCall; + } + } + } catch (Exception e) { + LOG.error("Unable to refresh Recon SCM DB Snapshot. ", e); + return false; + } + return true; + } + + private long getContainerCountPerCall(long totalContainerCount) { + // Assumption of size of 1 container info object here is 1 MB + long containersMetaDataTotalRpcRespSizeMB = + CONTAINER_METADATA_SIZE * totalContainerCount; + long hadoopRPCSize = ozoneConfiguration.getInt(IPC_MAXIMUM_DATA_LENGTH, IPC_MAXIMUM_DATA_LENGTH_DEFAULT); + long containerCountPerCall = containersMetaDataTotalRpcRespSizeMB <= + hadoopRPCSize ? totalContainerCount : + Math.round(Math.floor( + hadoopRPCSize / (double) CONTAINER_METADATA_SIZE)); + return containerCountPerCall; + } + + + +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java index 6cd15f0c004d..9e73c30edb81 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java @@ -21,7 +21,6 @@ import java.util.List; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.utils.db.DBCheckpoint; @@ -94,20 +93,6 @@ List getListOfContainerIDs(ContainerID startContainerID, HddsProtos.LifeCycleState state) throws IOException; - /** - * Get the list of containers from SCM. This is a RPC call. - * - * @param startContainerID the start container id - * @param count the number of containers to return - * @param state the containers in given state to be returned - * @return the list of containers from SCM in a given state - * @throws IOException - */ - List getListOfContainers(long startContainerID, - int count, - HddsProtos.LifeCycleState state) - throws IOException; - /** * Requests SCM for container count for a given state. * @return Total number of containers in SCM. diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java index 299a1c769ba9..6d4e31042341 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.ha.InterSCMGrpcClient; import org.apache.hadoop.hdds.scm.ha.SCMSnapshotDownloader; @@ -191,12 +190,4 @@ public List getListOfContainerIDs( throws IOException { return scmClient.getListOfContainerIDs(startContainerID, count, state); } - - @Override - public List getListOfContainers( - long startContainerID, int count, HddsProtos.LifeCycleState state) - throws IOException { - return scmClient.getListOfContainers(startContainerID, count, state); - } - } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconStorageContainerSyncHelper.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconStorageContainerSyncHelper.java new file mode 100644 index 000000000000..c79d153af5bb --- /dev/null +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconStorageContainerSyncHelper.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.scm; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider; +import org.junit.jupiter.api.Test; + +class TestReconStorageContainerSyncHelper { + + private final StorageContainerServiceProvider mockScmServiceProvider = + mock(StorageContainerServiceProvider.class); + + private final ReconContainerManager mockContainerManager = + mock(ReconContainerManager.class); + + private final ReconStorageContainerSyncHelper syncHelper; + + TestReconStorageContainerSyncHelper() { + syncHelper = new ReconStorageContainerSyncHelper( + mockScmServiceProvider, + new OzoneConfiguration(), + mockContainerManager + ); + } + + @Test + void testContainerMissingFromReconIsAdded() throws Exception { + ContainerID cid = ContainerID.valueOf(42L); + ContainerInfo info = new ContainerInfo.Builder() + .setContainerID(42L) + .setState(CLOSED) + .setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE)) + .setOwner("test") + .build(); + ContainerWithPipeline cwp = new ContainerWithPipeline(info, null); + + when(mockScmServiceProvider.getContainerCount(CLOSED)).thenReturn(1L); + when(mockScmServiceProvider.getListOfContainerIDs( + eq(ContainerID.valueOf(1L)), eq(1), eq(CLOSED))) + .thenReturn(Collections.singletonList(cid)); + when(mockContainerManager.containerExist(cid)).thenReturn(false); + when(mockScmServiceProvider.getContainerWithPipeline(42L)).thenReturn(cwp); + + boolean result = syncHelper.syncWithSCMContainerInfo(); + + assertTrue(result); + verify(mockScmServiceProvider).getContainerWithPipeline(42L); + verify(mockContainerManager).addNewContainer(cwp); + } + + @Test + void testContainerAlreadyInReconIsSkipped() throws Exception { + ContainerID cid = ContainerID.valueOf(7L); + + when(mockScmServiceProvider.getContainerCount(CLOSED)).thenReturn(1L); + when(mockScmServiceProvider.getListOfContainerIDs( + eq(ContainerID.valueOf(1L)), eq(1), eq(CLOSED))) + .thenReturn(Collections.singletonList(cid)); + when(mockContainerManager.containerExist(cid)).thenReturn(true); + + boolean result = syncHelper.syncWithSCMContainerInfo(); + + assertTrue(result); + verify(mockScmServiceProvider, never()).getContainerWithPipeline(anyLong()); + verify(mockContainerManager, never()).addNewContainer(any()); + } + + @Test + void testZeroClosedContainersReturnsTrue() throws Exception { + when(mockScmServiceProvider.getContainerCount(CLOSED)).thenReturn(0L); + + boolean result = syncHelper.syncWithSCMContainerInfo(); + + assertTrue(result); + verifyNoInteractions(mockContainerManager); + verify(mockScmServiceProvider, never()) + .getListOfContainerIDs(any(), any(Integer.class), any()); + } + + @Test + void testEmptyListFromSCMReturnsFalse() throws Exception { + when(mockScmServiceProvider.getContainerCount(CLOSED)).thenReturn(1L); + when(mockScmServiceProvider.getListOfContainerIDs( + eq(ContainerID.valueOf(1L)), eq(1), eq(CLOSED))) + .thenReturn(Collections.emptyList()); + + boolean result = syncHelper.syncWithSCMContainerInfo(); + + assertFalse(result); + verifyNoInteractions(mockContainerManager); + } + +} From f82519671e97f93785498cf927315affc8549f4f Mon Sep 17 00:00:00 2001 From: Jason O'Sullivan Date: Mon, 2 Mar 2026 16:12:29 +0000 Subject: [PATCH 05/10] HDDS-14730. Using container ids for recon sync --- .../hdds/scm/container/states/TestContainerStateMap.java | 3 --- .../ozone/recon/scm/ReconStorageContainerSyncHelper.java | 3 --- 2 files changed, 6 deletions(-) diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerStateMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerStateMap.java index b9fbfbbac032..c38c3c211bd9 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerStateMap.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerStateMap.java @@ -73,7 +73,4 @@ private ContainerInfo buildContainerInfo(long containerID, HddsProtos.LifeCycleS .setReplicationConfig(StandaloneReplicationConfig.getInstance(THREE)) .build(); } - - - } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java index 7dce78876e88..032cf5706c0c 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java @@ -105,7 +105,4 @@ private long getContainerCountPerCall(long totalContainerCount) { hadoopRPCSize / (double) CONTAINER_METADATA_SIZE)); return containerCountPerCall; } - - - } From 0d213a4766f2ad45449582699df9f18a72c9c3f3 Mon Sep 17 00:00:00 2001 From: Jason O'Sullivan Date: Wed, 4 Mar 2026 17:24:27 +0000 Subject: [PATCH 06/10] HDDS-14730. Using container ids for recon sync --- .../hdds/scm/container/ContainerManager.java | 4 +- .../ReconStorageContainerManagerFacade.java | 6 +- .../scm/ReconStorageContainerSyncHelper.java | 10 ++-- .../TestReconStorageContainerSyncHelper.java | 57 +++++++++++++++++++ 4 files changed, 65 insertions(+), 12 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java index 88d4831ee4f1..11d4f0cc6155 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java @@ -74,14 +74,14 @@ default List getContainers() { /** * Returns containers under certain conditions. - * Search container IDs from start ID(exclusive), + * Search container IDs from start ID(inclusive), * The max size of the searching range cannot exceed the * value of count. * * @param startID start containerID, >=0, * start searching at the head if 0. * @param count count must be >= 0 - * Usually the count will be replace with a very big + * Usually the count will be replaced with a very big * value instead of being unlimited in case the db is very big. * * @return a list of container. diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java index 87503c90ab81..67cfe81f7e40 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java @@ -136,9 +136,6 @@ public class ReconStorageContainerManagerFacade private static final Logger LOG = LoggerFactory .getLogger(ReconStorageContainerManagerFacade.class); - public static final long CONTAINER_METADATA_SIZE = 1 * 1024 * 1024L; - private static final String IPC_MAXIMUM_DATA_LENGTH = "ipc.maximum.data.length"; - private static final int IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 128 * 1024 * 1024; private final OzoneConfiguration ozoneConfiguration; private final ReconDatanodeProtocolServer datanodeProtocolServer; @@ -537,8 +534,7 @@ public void updateReconSCMDBWithNewSnapshot() throws IOException { } } - public boolean syncWithSCMContainerInfo() - throws Exception { + public boolean syncWithSCMContainerInfo() { if (isSyncDataFromSCMRunning.compareAndSet(false, true)) { return containerSyncHelper.syncWithSCMContainerInfo(); } else { diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java index 032cf5706c0c..4de6b298ff7f 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java @@ -17,10 +17,6 @@ package org.apache.hadoop.ozone.recon.scm; -import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH; -import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT; -import static org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade.CONTAINER_METADATA_SIZE; - import java.io.IOException; import java.util.List; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -33,6 +29,10 @@ class ReconStorageContainerSyncHelper { + public static final long CONTAINER_METADATA_SIZE = 1 * 1024 * 1024L; + private static final String IPC_MAXIMUM_DATA_LENGTH = "ipc.maximum.data.length"; + private static final int IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 128 * 1024 * 1024; + private static final Logger LOG = LoggerFactory .getLogger(ReconStorageContainerSyncHelper.class); @@ -48,7 +48,7 @@ class ReconStorageContainerSyncHelper { this.containerManager = containerManager; } - public boolean syncWithSCMContainerInfo() throws Exception { + public boolean syncWithSCMContainerInfo() { try { long totalContainerCount = scmServiceProvider.getContainerCount( HddsProtos.LifeCycleState.CLOSED); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconStorageContainerSyncHelper.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconStorageContainerSyncHelper.java index c79d153af5bb..23f16b715861 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconStorageContainerSyncHelper.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconStorageContainerSyncHelper.java @@ -23,6 +23,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -30,6 +31,7 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import java.util.Arrays; import java.util.Collections; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -82,6 +84,61 @@ void testContainerMissingFromReconIsAdded() throws Exception { verify(mockContainerManager).addNewContainer(cwp); } + @Test + void testContainerMissingFromReconIsAddedWhenMultiplePages() throws Exception { + // Force containerCountPerCall = 2 by capping the RPC size to 2MB + OzoneConfiguration pagedConf = new OzoneConfiguration(); + pagedConf.setInt("ipc.maximum.data.length", 2 * 1024 * 1024); + ReconStorageContainerSyncHelper pagedHelper = new ReconStorageContainerSyncHelper( + mockScmServiceProvider, pagedConf, mockContainerManager); + + // Page 1: containers 1 and 2 (both missing from Recon) + ContainerID cid1 = ContainerID.valueOf(1L); + ContainerID cid2 = ContainerID.valueOf(2L); + ContainerInfo info1 = new ContainerInfo.Builder() + .setContainerID(1L).setState(CLOSED) + .setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE)) + .setOwner("test").build(); + ContainerInfo info2 = new ContainerInfo.Builder() + .setContainerID(2L).setState(CLOSED) + .setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE)) + .setOwner("test").build(); + ContainerWithPipeline cwp1 = new ContainerWithPipeline(info1, null); + ContainerWithPipeline cwp2 = new ContainerWithPipeline(info2, null); + + // Page 2: container 3 (already in Recon) + ContainerID cid3 = ContainerID.valueOf(3L); + + when(mockScmServiceProvider.getContainerCount(CLOSED)).thenReturn(3L); + when(mockScmServiceProvider.getListOfContainerIDs( + eq(ContainerID.valueOf(1L)), eq(2), eq(CLOSED))) + .thenReturn(Arrays.asList(cid1, cid2)); + when(mockScmServiceProvider.getListOfContainerIDs( + eq(ContainerID.valueOf(3L)), eq(2), eq(CLOSED))) + .thenReturn(Collections.singletonList(cid3)); + + when(mockContainerManager.containerExist(cid1)).thenReturn(false); + when(mockContainerManager.containerExist(cid2)).thenReturn(false); + when(mockContainerManager.containerExist(cid3)).thenReturn(true); + when(mockScmServiceProvider.getContainerWithPipeline(1L)).thenReturn(cwp1); + when(mockScmServiceProvider.getContainerWithPipeline(2L)).thenReturn(cwp2); + + boolean result = pagedHelper.syncWithSCMContainerInfo(); + + assertTrue(result); + // Page 1: both missing containers were added + verify(mockContainerManager).addNewContainer(cwp1); + verify(mockContainerManager).addNewContainer(cwp2); + // Page 2: present container was skipped + verify(mockContainerManager, never()).addNewContainer( + argThat(cwp -> cwp.getContainerInfo().getContainerID() == 3L)); + // Both pages were fetched + verify(mockScmServiceProvider).getListOfContainerIDs( + eq(ContainerID.valueOf(1L)), eq(2), eq(CLOSED)); + verify(mockScmServiceProvider).getListOfContainerIDs( + eq(ContainerID.valueOf(3L)), eq(2), eq(CLOSED)); + } + @Test void testContainerAlreadyInReconIsSkipped() throws Exception { ContainerID cid = ContainerID.valueOf(7L); From 18fcbeafd64a54ab676c12c3a69fee18738267e7 Mon Sep 17 00:00:00 2001 From: Jason O'Sullivan Date: Tue, 10 Mar 2026 16:23:36 +0000 Subject: [PATCH 07/10] HDDS-14730. Update batch size for Recon to SCM sync using container ids --- .../ozone/recon/ReconServerConfigKeys.java | 10 ++++++++++ .../scm/ReconStorageContainerSyncHelper.java | 20 ++++++++++++------- .../TestReconStorageContainerSyncHelper.java | 5 +++-- 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java index a57095d2e4c7..9e1e302b92f1 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java @@ -221,6 +221,16 @@ public final class ReconServerConfigKeys { "ozone.recon.dn.metrics.collection.timeout"; public static final String OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT_DEFAULT = "10m"; + /** + * Maximum number of ContainerIDs to fetch from SCM per RPC call + * during container sync. Each ContainerID is approximately 12 bytes + * on the wire. Reduce this value on memory-constrained Recon nodes. + * Default: 1,000,000 (~32MB heap per batch, 4 calls for a 4M container cluster) + */ + public static final String OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE = + "ozone.recon.scm.container.id.batch.size"; + public static final long OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT = 1_000_000; + /** * Private constructor for utility class. */ diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java index 4de6b298ff7f..8e0339b2d9e0 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java @@ -17,6 +17,9 @@ package org.apache.hadoop.ozone.recon.scm; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT; + import java.io.IOException; import java.util.List; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -29,9 +32,9 @@ class ReconStorageContainerSyncHelper { - public static final long CONTAINER_METADATA_SIZE = 1 * 1024 * 1024L; private static final String IPC_MAXIMUM_DATA_LENGTH = "ipc.maximum.data.length"; private static final int IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 128 * 1024 * 1024; + private static final long CONTAINER_ID_PROTO_SIZE_BYTES = 12; private static final Logger LOG = LoggerFactory .getLogger(ReconStorageContainerSyncHelper.class); @@ -95,14 +98,17 @@ public boolean syncWithSCMContainerInfo() { } private long getContainerCountPerCall(long totalContainerCount) { - // Assumption of size of 1 container info object here is 1 MB - long containersMetaDataTotalRpcRespSizeMB = - CONTAINER_METADATA_SIZE * totalContainerCount; + // Assumption of size of 1 ContainerID proto here is 12 bytes + long totalIdsSizeBytes = CONTAINER_ID_PROTO_SIZE_BYTES * totalContainerCount; long hadoopRPCSize = ozoneConfiguration.getInt(IPC_MAXIMUM_DATA_LENGTH, IPC_MAXIMUM_DATA_LENGTH_DEFAULT); - long containerCountPerCall = containersMetaDataTotalRpcRespSizeMB <= + long countByRpc = totalIdsSizeBytes <= hadoopRPCSize ? totalContainerCount : Math.round(Math.floor( - hadoopRPCSize / (double) CONTAINER_METADATA_SIZE)); - return containerCountPerCall; + hadoopRPCSize / (double) CONTAINER_ID_PROTO_SIZE_BYTES)); + + long containerIdMaxBatchSize = ozoneConfiguration.getLong( + OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE, + OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT); + return Math.min(countByRpc, containerIdMaxBatchSize); } } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconStorageContainerSyncHelper.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconStorageContainerSyncHelper.java index 23f16b715861..9ba0d85a931b 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconStorageContainerSyncHelper.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconStorageContainerSyncHelper.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -86,9 +87,9 @@ void testContainerMissingFromReconIsAdded() throws Exception { @Test void testContainerMissingFromReconIsAddedWhenMultiplePages() throws Exception { - // Force containerCountPerCall = 2 by capping the RPC size to 2MB + // Force containerCountPerCall = 2 via the batch size config OzoneConfiguration pagedConf = new OzoneConfiguration(); - pagedConf.setInt("ipc.maximum.data.length", 2 * 1024 * 1024); + pagedConf.setLong(OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE, 2L); ReconStorageContainerSyncHelper pagedHelper = new ReconStorageContainerSyncHelper( mockScmServiceProvider, pagedConf, mockContainerManager); From 7d2826826b0a5007b58ec96321cdb1434e19ee3d Mon Sep 17 00:00:00 2001 From: Jason O'Sullivan Date: Wed, 18 Mar 2026 15:24:30 +0000 Subject: [PATCH 08/10] HDDS-14730. Updating recon container sync rpc message defaults --- .../ozone/recon/ReconServerConfigKeys.java | 4 ++-- .../scm/ReconStorageContainerSyncHelper.java | 19 +++++++++---------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java index 9e1e302b92f1..fe172f93094f 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java @@ -225,11 +225,11 @@ public final class ReconServerConfigKeys { * Maximum number of ContainerIDs to fetch from SCM per RPC call * during container sync. Each ContainerID is approximately 12 bytes * on the wire. Reduce this value on memory-constrained Recon nodes. - * Default: 1,000,000 (~32MB heap per batch, 4 calls for a 4M container cluster) + * Default: 500,000 (~16MB heap per batch, 8 calls for a 4M container cluster) */ public static final String OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE = "ozone.recon.scm.container.id.batch.size"; - public static final long OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT = 1_000_000; + public static final long OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT = 500_000; /** * Private constructor for utility class. diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java index 8e0339b2d9e0..1fbd1d5d638a 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java @@ -34,6 +34,8 @@ class ReconStorageContainerSyncHelper { private static final String IPC_MAXIMUM_DATA_LENGTH = "ipc.maximum.data.length"; private static final int IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 128 * 1024 * 1024; + + // Assumption of size of 1 ContainerID proto here is 12 bytes private static final long CONTAINER_ID_PROTO_SIZE_BYTES = 12; private static final Logger LOG = LoggerFactory @@ -98,17 +100,14 @@ public boolean syncWithSCMContainerInfo() { } private long getContainerCountPerCall(long totalContainerCount) { - // Assumption of size of 1 ContainerID proto here is 12 bytes - long totalIdsSizeBytes = CONTAINER_ID_PROTO_SIZE_BYTES * totalContainerCount; - long hadoopRPCSize = ozoneConfiguration.getInt(IPC_MAXIMUM_DATA_LENGTH, IPC_MAXIMUM_DATA_LENGTH_DEFAULT); - long countByRpc = totalIdsSizeBytes <= - hadoopRPCSize ? totalContainerCount : - Math.round(Math.floor( - hadoopRPCSize / (double) CONTAINER_ID_PROTO_SIZE_BYTES)); - - long containerIdMaxBatchSize = ozoneConfiguration.getLong( + long hadoopRPCSize = ozoneConfiguration.getInt( + IPC_MAXIMUM_DATA_LENGTH, IPC_MAXIMUM_DATA_LENGTH_DEFAULT); + long countByRpcLimit = hadoopRPCSize / CONTAINER_ID_PROTO_SIZE_BYTES; + long countByBatchLimit = ozoneConfiguration.getLong( OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE, OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT); - return Math.min(countByRpc, containerIdMaxBatchSize); + + long batchSize = Math.min(countByRpcLimit, countByBatchLimit); + return Math.min(totalContainerCount, batchSize); } } From 8575141db56560b6f68ed3d9ec536af121e89c88 Mon Sep 17 00:00:00 2001 From: Jason O'Sullivan Date: Thu, 19 Mar 2026 13:03:43 +0000 Subject: [PATCH 09/10] HDDS-14730. Updating recon container sync rpc message defaults --- .../org/apache/hadoop/ozone/TestOzoneConfigurationFields.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java index 98ccd8fac8be..be9f1ac00e96 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java @@ -113,6 +113,7 @@ private void addPropertiesNotInXml() { ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY, ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY, ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM, + ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE, OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY, OMConfigKeys.OZONE_OM_HA_PREFIX, OMConfigKeys.OZONE_OM_GRPC_PORT_KEY, From 1af709a5613d80249129abf92c25e8451f753b99 Mon Sep 17 00:00:00 2001 From: Jason O'Sullivan Date: Wed, 25 Mar 2026 10:42:10 +0000 Subject: [PATCH 10/10] HDDS-14730. Changing ScmClientProtocolServer.getContainerCount(state) to use ContainerManager.getContainerStateCount(state) --- .../scm/server/SCMClientProtocolServer.java | 2 +- .../server/TestSCMClientProtocolServer.java | 31 ++++++++++++++---- .../ozone/recon/ReconServerConfigKeys.java | 32 ++++++++++++++++--- .../scm/ReconStorageContainerSyncHelper.java | 8 ++--- 4 files changed, 57 insertions(+), 16 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 31a0f9dc8dfa..b161e0e84d76 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -1502,7 +1502,7 @@ public long getContainerCount(HddsProtos.LifeCycleState state) auditMap.put("state", String.valueOf(state)); try { - long count = scm.getContainerManager().getContainers(state).size(); + long count = scm.getContainerManager().getContainerStateCount(state); AUDIT.logReadSuccess(buildAuditMessageForSuccess( SCMAction.GET_CONTAINER_COUNT, auditMap)); return count; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java index 9402218014ce..7d2f399d1faf 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java @@ -17,10 +17,12 @@ package org.apache.hadoop.hdds.scm.server; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_READONLY_ADMINISTRATORS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -33,6 +35,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.ReconfigurationHandler; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionScmRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionScmResponseProto; import org.apache.hadoop.hdds.scm.HddsTestUtils; @@ -130,12 +133,27 @@ public void testScmListContainer() throws Exception { SCMClientProtocolServer scmServer = new SCMClientProtocolServer(new OzoneConfiguration(), mockStorageContainerManager(), mock(ReconfigurationHandler.class)); + try { + assertEquals(10, scmServer.listContainer(1, 10, + null, HddsProtos.ReplicationType.RATIS, null).getContainerInfoList().size()); + // Test call from a legacy client, which uses a different method of listContainer + assertEquals(10, scmServer.listContainer(1, 10, null, + HddsProtos.ReplicationFactor.THREE).getContainerInfoList().size()); + } finally { + scmServer.stop(); + } + } - assertEquals(10, scmServer.listContainer(1, 10, - null, HddsProtos.ReplicationType.RATIS, null).getContainerInfoList().size()); - // Test call from a legacy client, which uses a different method of listContainer - assertEquals(10, scmServer.listContainer(1, 10, null, - HddsProtos.ReplicationFactor.THREE).getContainerInfoList().size()); + @Test + public void testScmGetContainerCount() throws IOException { + SCMClientProtocolServer scmServer = + new SCMClientProtocolServer(new OzoneConfiguration(), + mockStorageContainerManager(), mock(ReconfigurationHandler.class)); + try { + assertEquals(10, scmServer.getContainerCount(CLOSED)); + } finally { + scmServer.stop(); + } } private StorageContainerManager mockStorageContainerManager() { @@ -145,11 +163,12 @@ private StorageContainerManager mockStorageContainerManager() { } ContainerManagerImpl containerManager = mock(ContainerManagerImpl.class); when(containerManager.getContainers()).thenReturn(infos); + when(containerManager.getContainerStateCount(any(LifeCycleState.class))).thenReturn(infos.size()); StorageContainerManager storageContainerManager = mock(StorageContainerManager.class); when(storageContainerManager.getContainerManager()).thenReturn(containerManager); SCMNodeDetails scmNodeDetails = mock(SCMNodeDetails.class); - when(scmNodeDetails.getClientProtocolServerAddress()).thenReturn(new InetSocketAddress("localhost", 9876)); + when(scmNodeDetails.getClientProtocolServerAddress()).thenReturn(new InetSocketAddress("localhost", 0)); when(scmNodeDetails.getClientProtocolServerAddressKey()).thenReturn("test"); when(storageContainerManager.getScmNodeDetails()).thenReturn(scmNodeDetails); return storageContainerManager; diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java index fe172f93094f..b4da42d8f03a 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java @@ -222,14 +222,36 @@ public final class ReconServerConfigKeys { public static final String OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT_DEFAULT = "10m"; /** - * Maximum number of ContainerIDs to fetch from SCM per RPC call - * during container sync. Each ContainerID is approximately 12 bytes - * on the wire. Reduce this value on memory-constrained Recon nodes. - * Default: 500,000 (~16MB heap per batch, 8 calls for a 4M container cluster) + * Application-level ceiling on the number of ContainerIDs fetched from SCM + * per RPC call during container sync. The effective batch size is + * {@code min(this value, ipc.maximum.data.length / 12, totalContainerCount)}, + * so raising this above the default is only meaningful if + * {@code ipc.maximum.data.length} has also been raised from its default. + * + *

Recon wire cost: each ContainerID is ~12 bytes on the wire, so + * the default 1,000,000 produces ~12 MB per RPC. + * + *

Recon JVM heap: each deserialized {@code ContainerID} object + * occupies ~32 bytes, so the default batch requires ~32 MB of heap on Recon. + * Reduce this value on memory-constrained Recon nodes. + * + *

SCM-side pressure: on each RPC call SCM holds its container + * state read lock (a fair {@link java.util.concurrent.locks.ReentrantReadWriteLock}) + * for the full duration of streaming N entries from its in-memory + * {@link java.util.TreeMap} and collecting them into a response list. + * Because the lock is fair, any concurrent write (container allocation, + * state transition) queuing for the write lock will be blocked for the + * entire batch duration — and new reads queue behind that waiting writer. + * Larger batches therefore increase worst-case container-allocation latency + * on SCM during sync. On write-heavy SCM nodes, prefer smaller batches with + * more calls over fewer large batches. + * + *

Default: 1,000,000 (~12 MB wire, ~32 MB JVM heap per batch on Recon; + * 4 calls for a 4 M-container cluster) */ public static final String OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE = "ozone.recon.scm.container.id.batch.size"; - public static final long OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT = 500_000; + public static final long OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT = 1_000_000; /** * Private constructor for utility class. diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java index 1fbd1d5d638a..c8d940aa8357 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java @@ -17,6 +17,8 @@ package org.apache.hadoop.ozone.recon.scm; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT; @@ -32,10 +34,8 @@ class ReconStorageContainerSyncHelper { - private static final String IPC_MAXIMUM_DATA_LENGTH = "ipc.maximum.data.length"; - private static final int IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 128 * 1024 * 1024; - - // Assumption of size of 1 ContainerID proto here is 12 bytes + // Serialized size of one ContainerID proto on the wire (varint tag + 8-byte long = ~12 bytes). + // Used to derive the maximum batch size that fits within ipc.maximum.data.length. private static final long CONTAINER_ID_PROTO_SIZE_BYTES = 12; private static final Logger LOG = LoggerFactory