diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/SlidingWindow.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/SlidingWindow.java index 744e842a398a..316c88aba57b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/SlidingWindow.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/SlidingWindow.java @@ -149,12 +149,17 @@ private long getCurrentTime() { return clock.millis(); } + public long getExpiryDurationMillis() { + return expiryDurationMillis; + } + /** - * A custom monotonic clock implementation. + * A custom monotonic clock implementation to allow overriding the current time for testing purposes. * Implementation of Clock that uses System.nanoTime() for real usage. - * See {@see org.apache.ozone.test.TestClock} + * The class {@code org.apache.ozone.test.TestClock} provides a mock clock which can be used + * to manipulate the current time in tests. */ - private static final class MonotonicClock extends Clock { + public static final class MonotonicClock extends Clock { @Override public long millis() { return TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java index 41f6d36971ff..51af0a30df31 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java @@ -26,6 +26,7 @@ import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.CONFIG_PREFIX; import java.time.Duration; +import org.apache.commons.lang3.time.DurationFormatUtils; import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigTag; @@ -61,6 +62,7 @@ public class DatanodeConfiguration extends ReconfigurableConfig { public static final String FAILED_DB_VOLUMES_TOLERATED_KEY = "hdds.datanode.failed.db.volumes.tolerated"; public static final String DISK_CHECK_MIN_GAP_KEY = "hdds.datanode.disk.check.min.gap"; public static final String DISK_CHECK_TIMEOUT_KEY = "hdds.datanode.disk.check.timeout"; + public static final String DISK_CHECK_SLIDING_WINDOW_TIMEOUT_KEY = "hdds.datanode.disk.check.sliding.window.timeout"; // Minimum space should be left on volume. // Ex: If volume has 1000GB and minFreeSpace is configured as 10GB, @@ -99,6 +101,9 @@ public class DatanodeConfiguration extends ReconfigurableConfig { static final Duration DISK_CHECK_TIMEOUT_DEFAULT = Duration.ofMinutes(10); + static final Duration DISK_CHECK_SLIDING_WINDOW_TIMEOUT_DEFAULT = + Duration.ofMinutes(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT).plus(DISK_CHECK_TIMEOUT_DEFAULT); + static final boolean CONTAINER_SCHEMA_V3_ENABLED_DEFAULT = true; static final long ROCKSDB_LOG_MAX_FILE_SIZE_BYTES_DEFAULT = 32 * 1024 * 1024; static final int ROCKSDB_LOG_MAX_FILE_NUM_DEFAULT = 64; @@ -404,6 +409,19 @@ public class DatanodeConfiguration extends ReconfigurableConfig { ) private Duration diskCheckTimeout = DISK_CHECK_TIMEOUT_DEFAULT; + @Config(key = "hdds.datanode.disk.check.sliding.window.timeout", + defaultValue = "70m", + type = ConfigType.TIME, + tags = {ConfigTag.DATANODE}, + description = "Time interval after which a disk check" + + " failure result stored in the sliding window will expire." + + " Do not set the window timeout period to less than or equal to the disk check interval period" + + " or failures can be missed across sparse checks" + + " e.g., every 120m interval with a 60m window rarely accumulates enough failed events" + + " Unit could be defined with postfix (ns,ms,s,m,h,d)." + ) + private Duration diskCheckSlidingWindowTimeout = DISK_CHECK_SLIDING_WINDOW_TIMEOUT_DEFAULT; + @Config(key = "hdds.datanode.chunk.data.validation.check", defaultValue = "false", type = ConfigType.BOOLEAN, @@ -688,6 +706,23 @@ public void validate() { diskCheckTimeout = DISK_CHECK_TIMEOUT_DEFAULT; } + if (diskCheckSlidingWindowTimeout.isNegative()) { + LOG.warn("{} must be greater than zero and was set to {}. Defaulting to {}", + DISK_CHECK_SLIDING_WINDOW_TIMEOUT_KEY, diskCheckSlidingWindowTimeout, + DISK_CHECK_SLIDING_WINDOW_TIMEOUT_DEFAULT); + diskCheckSlidingWindowTimeout = DISK_CHECK_SLIDING_WINDOW_TIMEOUT_DEFAULT; + } + + // Do not set window timeout <= periodic disk check interval period, or failures can be missed across sparse checks + // e.g., every 120m interval with a 60m window rarely accumulates enough failed events + if (diskCheckSlidingWindowTimeout.compareTo(Duration.ofMinutes(periodicDiskCheckIntervalMinutes)) < 0) { + LOG.warn("{} must be greater than or equal to {} minutes and was set to {} minutes. Defaulting to {}", + DISK_CHECK_SLIDING_WINDOW_TIMEOUT_KEY, periodicDiskCheckIntervalMinutes, + diskCheckSlidingWindowTimeout.toMinutes(), + DurationFormatUtils.formatDurationHMS(DISK_CHECK_SLIDING_WINDOW_TIMEOUT_DEFAULT.toMillis())); + diskCheckSlidingWindowTimeout = DISK_CHECK_SLIDING_WINDOW_TIMEOUT_DEFAULT; + } + if (blockDeleteCommandWorkerInterval.isNegative()) { LOG.warn(BLOCK_DELETE_COMMAND_WORKER_INTERVAL + " must be greater than zero and was set to {}. Defaulting to {}", @@ -907,6 +942,14 @@ public void setDiskCheckTimeout(Duration duration) { diskCheckTimeout = duration; } + public Duration getDiskCheckSlidingWindowTimeout() { + return diskCheckSlidingWindowTimeout; + } + + public void setDiskCheckSlidingWindowTimeout(Duration duration) { + diskCheckSlidingWindowTimeout = duration; + } + public int getBlockDeleteThreads() { return blockDeleteThreads; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java index f331db7defc3..6b3f65b4b57f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java @@ -26,13 +26,10 @@ import java.io.File; import java.io.IOException; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; -import java.util.Queue; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import org.apache.commons.io.FileUtils; @@ -110,11 +107,6 @@ public class HddsVolume extends StorageVolume { private final AtomicBoolean dbLoaded = new AtomicBoolean(false); private final AtomicBoolean dbLoadFailure = new AtomicBoolean(false); - private final int volumeTestCount; - private final int volumeTestFailureTolerance; - private AtomicInteger volumeTestFailureCount; - private Queue volumeTestResultQueue; - /** * Builder for HddsVolume. */ @@ -147,11 +139,6 @@ private HddsVolume(Builder b) throws IOException { this.volumeInfoMetrics = new VolumeInfoMetrics(b.getVolumeRootStr(), this); - this.volumeTestCount = getDatanodeConfig().getVolumeIOTestCount(); - this.volumeTestFailureTolerance = getDatanodeConfig().getVolumeIOFailureTolerance(); - this.volumeTestFailureCount = new AtomicInteger(0); - this.volumeTestResultQueue = new LinkedList<>(); - initialize(); } else { // Builder is called with failedVolume set, so create a failed volume @@ -159,8 +146,6 @@ private HddsVolume(Builder b) throws IOException { this.setState(VolumeState.FAILED); volumeIOStats = null; volumeInfoMetrics = new VolumeInfoMetrics(b.getVolumeRootStr(), this); - this.volumeTestCount = 0; - this.volumeTestFailureTolerance = 0; } LOG.info("HddsVolume: {}", getReport()); @@ -322,38 +307,32 @@ public synchronized VolumeCheckResult check(@Nullable Boolean unused) @VisibleForTesting public VolumeCheckResult checkDbHealth(File dbFile) throws InterruptedException { - if (volumeTestCount == 0) { + if (getIoTestCount() == 0) { return VolumeCheckResult.HEALTHY; } - final boolean isVolumeTestResultHealthy = true; try (ManagedOptions managedOptions = new ManagedOptions(); ManagedRocksDB ignored = ManagedRocksDB.openReadOnly(managedOptions, dbFile.toString())) { - volumeTestResultQueue.add(isVolumeTestResultHealthy); + // Do nothing. Only check if rocksdb is accessible. + LOG.debug("Successfully opened the database at \"{}\" for HDDS volume {}.", dbFile, getStorageDir()); } catch (Exception e) { if (Thread.currentThread().isInterrupted()) { throw new InterruptedException("Check of database for volume " + this + " interrupted."); } LOG.warn("Could not open Volume DB located at {}", dbFile, e); - volumeTestResultQueue.add(!isVolumeTestResultHealthy); - volumeTestFailureCount.incrementAndGet(); - } - - if (volumeTestResultQueue.size() > volumeTestCount - && (Boolean.TRUE.equals(volumeTestResultQueue.poll()) != isVolumeTestResultHealthy)) { - volumeTestFailureCount.decrementAndGet(); + getIoTestSlidingWindow().add(); } - if (volumeTestFailureCount.get() > volumeTestFailureTolerance) { + if (getIoTestSlidingWindow().isExceeded()) { LOG.error("Failed to open the database at \"{}\" for HDDS volume {}: " + - "the last {} runs encountered {} out of {} tolerated failures.", - dbFile, this, volumeTestResultQueue.size(), volumeTestFailureCount.get(), volumeTestFailureTolerance); + "encountered more than the {} tolerated failures.", + dbFile, this, getIoTestSlidingWindow().getWindowSize()); return VolumeCheckResult.FAILED; } LOG.debug("Successfully opened the database at \"{}\" for HDDS volume {}: " + - "the last {} runs encountered {} out of {} tolerated failures", - dbFile, this, volumeTestResultQueue.size(), volumeTestFailureTolerance, volumeTestFailureTolerance); + "encountered {} out of {} tolerated failures", + dbFile, this, getIoTestSlidingWindow().getNumEventsInWindow(), getIoTestSlidingWindow().getWindowSize()); return VolumeCheckResult.HEALTHY; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java index 5260f8468930..01f0e351d722 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java @@ -27,12 +27,10 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.LinkedList; +import java.time.Clock; import java.util.Objects; import java.util.Properties; -import java.util.Queue; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Stream; @@ -43,6 +41,7 @@ import org.apache.hadoop.hdds.fs.SpaceUsageSource; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.server.ServerUtils; +import org.apache.hadoop.hdds.utils.SlidingWindow; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.checker.Checkable; import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; @@ -111,9 +110,7 @@ public abstract class StorageVolume implements Checkable ioTestSlidingWindow; + private SlidingWindow ioTestSlidingWindow; private int healthCheckFileSize; /** @@ -163,9 +160,8 @@ protected StorageVolume(Builder b) throws IOException { this.conf = b.conf; this.dnConf = conf.getObject(DatanodeConfiguration.class); this.ioTestCount = dnConf.getVolumeIOTestCount(); - this.ioFailureTolerance = dnConf.getVolumeIOFailureTolerance(); - this.ioTestSlidingWindow = new LinkedList<>(); - this.currentIOFailureCount = new AtomicInteger(0); + this.ioTestSlidingWindow = new SlidingWindow(dnConf.getVolumeIOFailureTolerance(), + dnConf.getDiskCheckSlidingWindowTimeout(), b.getClock()); this.healthCheckFileSize = dnConf.getVolumeHealthCheckFileSize(); } else { storageDir = new File(b.volumeRootStr); @@ -174,7 +170,6 @@ protected StorageVolume(Builder b) throws IOException { this.storageID = UUID.randomUUID().toString(); this.state = VolumeState.FAILED; this.ioTestCount = 0; - this.ioFailureTolerance = 0; this.conf = null; this.dnConf = null; } @@ -411,6 +406,7 @@ public abstract static class Builder> { private boolean failedVolume = false; private String datanodeUuid; private String clusterID; + private Clock clock = new SlidingWindow.MonotonicClock(); public Builder(String volumeRootStr, String storageDirStr) { this.volumeRootStr = volumeRootStr; @@ -457,6 +453,11 @@ public T clusterID(String cid) { return this.getThis(); } + public T clock(Clock c) { + this.clock = c; + return this.getThis(); + } + public abstract StorageVolume build() throws IOException; public String getVolumeRootStr() { @@ -474,6 +475,10 @@ public StorageType getStorageType() { public String getStorageDirStr() { return this.storageDirStr; } + + public Clock getClock() { + return this.clock; + } } public String getVolumeRootDir() { @@ -554,6 +559,14 @@ public VolumeSet getVolumeSet() { return this.volumeSet; } + public int getIoTestCount() { + return ioTestCount; + } + + public SlidingWindow getIoTestSlidingWindow() { + return ioTestSlidingWindow; + } + public StorageType getStorageType() { return storageType; } @@ -661,7 +674,7 @@ private void cleanTmpDiskCheckDir() { * check consists of a directory check and an IO check. * * If the directory check fails, the volume check fails immediately. - * The IO check is allows to fail up to {@code ioFailureTolerance} times + * The IO check is allowed to fail up to {@code ioFailureTolerance} times * out of the last {@code ioTestCount} IO checks before this volume check is * failed. Each call to this method runs one IO check. * @@ -698,7 +711,6 @@ public synchronized VolumeCheckResult check(@Nullable Boolean unused) // to avoid volume failure we can ignore checking disk read/write int minimumDiskSpace = healthCheckFileSize * 2; if (getCurrentUsage().getAvailable() < minimumDiskSpace) { - ioTestSlidingWindow.add(true); return VolumeCheckResult.HEALTHY; } @@ -717,39 +729,25 @@ public synchronized VolumeCheckResult check(@Nullable Boolean unused) // We can check again if disk is full. If it is full, // in this case keep volume as healthy so that READ can still be served if (!diskChecksPassed && getCurrentUsage().getAvailable() < minimumDiskSpace) { - ioTestSlidingWindow.add(true); return VolumeCheckResult.HEALTHY; } - // Move the sliding window of IO test results forward 1 by adding the - // latest entry and removing the oldest entry from the window. - // Update the failure counter for the new window. - ioTestSlidingWindow.add(diskChecksPassed); if (!diskChecksPassed) { - currentIOFailureCount.incrementAndGet(); - } - if (ioTestSlidingWindow.size() > ioTestCount && - Objects.equals(ioTestSlidingWindow.poll(), Boolean.FALSE)) { - currentIOFailureCount.decrementAndGet(); + ioTestSlidingWindow.add(); } - // If the failure threshold has been crossed, fail the volume without - // further scans. + // If the failure threshold has been crossed, fail the volume without further scans. // Once the volume is failed, it will not be checked anymore. // The failure counts can be left as is. - if (currentIOFailureCount.get() > ioFailureTolerance) { - LOG.error("Failed IO test for volume {}: the last {} runs " + - "encountered {} out of {} tolerated failures.", this, - ioTestSlidingWindow.size(), currentIOFailureCount, - ioFailureTolerance); + if (ioTestSlidingWindow.isExceeded()) { + LOG.error("Failed IO test for volume {}: encountered more than the {} tolerated failures within the past {} ms.", + this, ioTestSlidingWindow.getWindowSize(), ioTestSlidingWindow.getExpiryDurationMillis()); return VolumeCheckResult.FAILED; - } else if (LOG.isDebugEnabled()) { - LOG.debug("IO test results for volume {}: the last {} runs encountered " + - "{} out of {} tolerated failures", this, - ioTestSlidingWindow.size(), - currentIOFailureCount, ioFailureTolerance); } + LOG.debug("IO test results for volume {}: encountered {} out of {} tolerated failures", + this, ioTestSlidingWindow.getNumEventsInWindow(), ioTestSlidingWindow.getWindowSize()); + return VolumeCheckResult.HEALTHY; } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java index 1c9b8bec8c8f..06d844ca9183 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java @@ -23,6 +23,7 @@ import java.io.File; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Duration; import java.util.UUID; import java.util.stream.Stream; import org.apache.commons.io.FileUtils; @@ -31,6 +32,7 @@ import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.utils.DiskCheckUtil; +import org.apache.ozone.test.TestClock; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Named; import org.junit.jupiter.api.Test; @@ -48,6 +50,7 @@ public class TestStorageVolumeHealthChecks { private static final String DATANODE_UUID = UUID.randomUUID().toString(); private static final String CLUSTER_ID = UUID.randomUUID().toString(); private static final OzoneConfiguration CONF = new OzoneConfiguration(); + private static final TestClock TEST_CLOCK = TestClock.newInstance(); @TempDir private static Path volumePath; @@ -57,19 +60,22 @@ public static Stream volumeBuilders() { new HddsVolume.Builder(volumePath.toString()) .datanodeUuid(DATANODE_UUID) .conf(CONF) - .usageCheckFactory(MockSpaceUsageCheckFactory.NONE); + .usageCheckFactory(MockSpaceUsageCheckFactory.NONE) + .clock(TEST_CLOCK); MetadataVolume.Builder metadataVolumeBuilder = new MetadataVolume.Builder(volumePath.toString()) .datanodeUuid(DATANODE_UUID) .conf(CONF) - .usageCheckFactory(MockSpaceUsageCheckFactory.NONE); + .usageCheckFactory(MockSpaceUsageCheckFactory.NONE) + .clock(TEST_CLOCK); DbVolume.Builder dbVolumeBuilder = new DbVolume.Builder(volumePath.toString()) .datanodeUuid(DATANODE_UUID) .conf(CONF) - .usageCheckFactory(MockSpaceUsageCheckFactory.NONE); + .usageCheckFactory(MockSpaceUsageCheckFactory.NONE) + .clock(TEST_CLOCK); return Stream.of( Arguments.of(Named.of("HDDS Volume", hddsVolumeBuilder)), @@ -304,13 +310,21 @@ private void testCheckIOUntilFailure(StorageVolume.Builder builder, DatanodeConfiguration dnConf = CONF.getObject(DatanodeConfiguration.class); dnConf.setVolumeIOTestCount(ioTestCount); dnConf.setVolumeIOFailureTolerance(ioFailureTolerance); + dnConf.setDiskCheckSlidingWindowTimeout(Duration.ofMillis(ioTestCount)); CONF.setFromObject(dnConf); builder.conf(CONF); StorageVolume volume = builder.build(); volume.format(CLUSTER_ID); volume.createTmpDirs(CLUSTER_ID); + // Sliding window protocol transitioned from count-based to a time-based system + // Update the event rate so that all the tested events are processed within the same sliding window period + long slidingWindowTimeoutMillis = volume.getConf().getObject(DatanodeConfiguration.class) + .getDiskCheckSlidingWindowTimeout().toMillis(); + long eventRateMillis = slidingWindowTimeoutMillis / ioTestCount; for (int i = 0; i < checkResults.length; i++) { + // Sleep to allow entries in the sliding window to eventually timeout + TEST_CLOCK.fastForward(eventRateMillis); final boolean result = checkResults[i]; final DiskCheckUtil.DiskChecks ioResult = new DiskCheckUtil.DiskChecks() { @Override