Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ public enum LogKeys implements LogKey {
EXPR,
EXPR_TERMS,
EXTENDED_EXPLAIN_GENERATOR,
FALLBACK_STORAGE_BLOCKS_SIZE,
FAILED_STAGE,
FAILED_STAGE_NAME,
FAILURES,
Expand Down Expand Up @@ -473,6 +474,7 @@ public enum LogKeys implements LogKey {
NUM_EXECUTOR_DESIRED,
NUM_EXECUTOR_LAUNCH,
NUM_EXECUTOR_TARGET,
NUM_FALLBACK_STORAGE_BLOCKS,
NUM_FAILURES,
NUM_FEATURES,
NUM_FILES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1501,6 +1501,13 @@ package object config {
"maxRemoteBlockSizeFetchToMem cannot be larger than (Int.MaxValue - 512) bytes.")
.createWithDefaultString("200m")

private[spark] val REDUCER_FALLBACK_STORAGE_READ_THREADS =
ConfigBuilder("spark.reducer.fallbackStorage.readThreads")
.doc("Number of threads used by the reducer to read shuffle blocks from fallback storage.")
.version("4.2.0")
.intConf
.createWithDefault(5)

private[spark] val TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES =
ConfigBuilder("spark.taskMetrics.trackUpdatedBlockStatuses")
.doc("Enable tracking of updatedBlockStatuses in the TaskMetrics. Off by default since " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ trait BlockDataManager {
*/
def getLocalBlockData(blockId: BlockId): ManagedBuffer

/**
* Interface to get fallback storage block data. Throws an exception if the block cannot be found
* or cannot be read successfully.
*/
def getFallbackStorageBlockData(blockId: BlockId): ManagedBuffer

/**
* Put the block locally, using the given storage level.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
SparkEnv.get.conf.get(config.SHUFFLE_MAX_ATTEMPTS_ON_NETTY_OOM),
SparkEnv.get.conf.get(config.REDUCER_FALLBACK_STORAGE_READ_THREADS),
SparkEnv.get.conf.get(config.SHUFFLE_DETECT_CORRUPT),
SparkEnv.get.conf.get(config.SHUFFLE_DETECT_CORRUPT_MEMORY),
SparkEnv.get.conf.get(config.SHUFFLE_CHECKSUM_ENABLED),
Expand Down
30 changes: 20 additions & 10 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -759,16 +759,7 @@ private[spark] class BlockManager(
override def getLocalBlockData(blockId: BlockId): ManagedBuffer = {
if (blockId.isShuffle) {
logDebug(s"Getting local shuffle block ${blockId}")
try {
shuffleManager.shuffleBlockResolver.getBlockData(blockId)
} catch {
case e: IOException =>
if (conf.get(config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
FallbackStorage.read(conf, blockId)
} else {
throw e
}
}
shuffleManager.shuffleBlockResolver.getBlockData(blockId)
} else {
getLocalBytes(blockId) match {
case Some(blockData) =>
Expand All @@ -783,6 +774,25 @@ private[spark] class BlockManager(
}
}

/**
* Interface to get fallback storage block data. Throws an exception if the block cannot be found
* or cannot be read successfully.
*/
override def getFallbackStorageBlockData(blockId: BlockId): ManagedBuffer = {
require(conf.get(config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined)

if (blockId.isShuffle) {
logDebug(s"Getting fallback storage block ${blockId}")
FallbackStorage.read(conf, blockId)
} else {
// If this block manager receives a request for a block that it doesn't have then it's
// likely that the master has outdated block statuses for this block. Therefore, we send
// an RPC so that this block is marked as being unavailable from this block manager.
reportBlockStatus(blockId, BlockStatus.empty)
throw SparkCoreErrors.blockNotFoundError(blockId)
}
}

/**
* Put the block locally, using the given storage level.
*
Expand Down
Loading
Loading