diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java index 734dedcccd..2d767980de 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java @@ -17,6 +17,8 @@ package org.apache.spark.shuffle; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import scala.Tuple2; @@ -512,4 +514,15 @@ public static RssConf toRssConf(SparkConf sparkConf) { } return rssConf; } + + public static Map sparkConfToMap(SparkConf sparkConf) { + Map map = new HashMap<>(); + + for (Tuple2 tuple : sparkConf.getAll()) { + String key = tuple._1; + map.put(key, tuple._2); + } + + return map; + } } diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java index 8e921c66e7..c1b697cbde 100644 --- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java @@ -34,8 +34,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import scala.Tuple2; - import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -1064,7 +1062,7 @@ protected void registerShuffleServers( } LOG.info("Start to register shuffleId {}", shuffleId); long start = System.currentTimeMillis(); - Map sparkConfMap = sparkConfToMap(getSparkConf()); + Map sparkConfMap = RssSparkConfig.sparkConfToMap(getSparkConf()); serverToPartitionRanges.entrySet().stream() .forEach( entry -> { @@ -1095,7 +1093,7 @@ protected void registerShuffleServers( } LOG.info("Start to register shuffleId[{}]", shuffleId); long start = System.currentTimeMillis(); - Map sparkConfMap = sparkConfToMap(getSparkConf()); + Map sparkConfMap = RssSparkConfig.sparkConfToMap(getSparkConf()); Set>> entries = serverToPartitionRanges.entrySet(); entries.stream() @@ -1141,15 +1139,4 @@ public boolean isRssStageRetryForFetchFailureEnabled() { public SparkConf getSparkConf() { return sparkConf; } - - public Map sparkConfToMap(SparkConf sparkConf) { - Map map = new HashMap<>(); - - for (Tuple2 tuple : sparkConf.getAll()) { - String key = tuple._1; - map.put(key, tuple._2); - } - - return map; - } } diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index af9295e559..027507a47a 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -17,6 +17,7 @@ package org.apache.spark.shuffle; +import java.util.List; import java.util.Map; import java.util.Set; @@ -32,6 +33,8 @@ import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient; import org.apache.uniffle.client.request.RssAccessClusterRequest; import org.apache.uniffle.client.response.RssAccessClusterResponse; +import org.apache.uniffle.common.config.RssClientConf; +import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.exception.RssException; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.Constants; @@ -124,6 +127,13 @@ private boolean tryAccessCluster() { extraProperties.put( ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, String.valueOf(assignmentShuffleNodesNum)); + RssConf rssConf = RssSparkConfig.toRssConf(sparkConf); + List excludeProperties = + rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES); + rssConf.getAll().stream() + .filter(entry -> !excludeProperties.contains(entry.getKey())) + .forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue())); + Set assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf); try { if (coordinatorClient != null) { diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index bb8ed3a901..ac1c9a0fd2 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -17,6 +17,7 @@ package org.apache.spark.shuffle; +import java.util.List; import java.util.Map; import java.util.Set; @@ -32,6 +33,8 @@ import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient; import org.apache.uniffle.client.request.RssAccessClusterRequest; import org.apache.uniffle.client.response.RssAccessClusterResponse; +import org.apache.uniffle.common.config.RssClientConf; +import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.exception.RssException; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.Constants; @@ -124,6 +127,13 @@ private boolean tryAccessCluster() { extraProperties.put( ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, String.valueOf(assignmentShuffleNodesNum)); + RssConf rssConf = RssSparkConfig.toRssConf(sparkConf); + List excludeProperties = + rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES); + rssConf.getAll().stream() + .filter(entry -> !excludeProperties.contains(entry.getKey())) + .forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue())); + Set assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf); try { if (coordinatorClient != null) { diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java index 6d311a549a..79b41afcc5 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java @@ -303,4 +303,11 @@ public class RssClientConf { .withDescription( "The block id manager class of server for this application, " + "the implementation of this interface to manage the shuffle block ids"); + + public static final ConfigOption> RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES = + ConfigOptions.key("rss.client.reportExcludeProperties") + .stringType() + .asList() + .defaultValues() + .withDescription("the report exclude properties could be configured by this option"); }