diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index d2cb4038e6..7f2af3ef2b 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -133,7 +133,10 @@ private boolean tryAccessCluster() { RssConf rssConf = RssSparkConfig.toRssConf(sparkConf); List excludeProperties = rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES); + List includeProperties = + rssConf.get(RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES); rssConf.getAll().stream() + .filter(entry -> includeProperties == null || includeProperties.contains(entry.getKey())) .filter(entry -> !excludeProperties.contains(entry.getKey())) .forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue())); diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index aaa2c10925..0b683834c9 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -137,7 +137,10 @@ private boolean tryAccessCluster() { RssConf rssConf = RssSparkConfig.toRssConf(sparkConf); List excludeProperties = rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES); + List includeProperties = + rssConf.get(RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES); rssConf.getAll().stream() + .filter(entry -> includeProperties == null || includeProperties.contains(entry.getKey())) .filter(entry -> !excludeProperties.contains(entry.getKey())) .forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue())); diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java index 092869e31d..a38bfad7ba 100644 --- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java +++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java @@ -22,7 +22,11 @@ import org.apache.spark.SparkConf; import org.apache.spark.shuffle.sort.SortShuffleManager; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.apache.uniffle.client.api.CoordinatorClient; +import org.apache.uniffle.client.request.RssAccessClusterRequest; +import org.apache.uniffle.common.config.RssClientConf; import org.apache.uniffle.storage.util.StorageType; import static org.apache.uniffle.common.rpc.StatusCode.ACCESS_DENIED; @@ -30,6 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.verify; public class DelegationRssShuffleManagerTest extends RssShuffleManagerTestBase { @@ -131,6 +136,87 @@ public void testTryAccessCluster() throws Exception { assertCreateSortShuffleManager(secondConf); } + @Test + public void testDefaultIncludeExcludeProperties() throws Exception { + final CoordinatorClient mockClient = setupMockedRssShuffleUtils(SUCCESS); + SparkConf conf = new SparkConf(); + conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L); + conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3); + conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); + conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId"); + conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002"); + conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name()); + conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true); + final int confInitKeyCount = conf.getAll().length; + assertCreateRssShuffleManager(conf); + + // default case: access cluster should include all properties in conf and an extra one. + ArgumentCaptor argumentCaptor = + ArgumentCaptor.forClass(RssAccessClusterRequest.class); + verify(mockClient).accessCluster(argumentCaptor.capture()); + RssAccessClusterRequest request = argumentCaptor.getValue(); + assertEquals(confInitKeyCount + 1, request.getExtraProperties().size()); + } + + @Test + public void testIncludeProperties() throws Exception { + final CoordinatorClient mockClient = setupMockedRssShuffleUtils(SUCCESS); + SparkConf conf = new SparkConf(); + conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L); + conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3); + conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); + conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId"); + conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002"); + conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name()); + conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true); + // test include properties + conf.set( + RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + + RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES.key(), + RssSparkConfig.RSS_ACCESS_ID + .key() + .substring(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX.length())); + assertCreateRssShuffleManager(conf); + + ArgumentCaptor argumentCaptor = + ArgumentCaptor.forClass(RssAccessClusterRequest.class); + + verify(mockClient).accessCluster(argumentCaptor.capture()); + RssAccessClusterRequest request = argumentCaptor.getValue(); + // only accessId and extra one + assertEquals(1 + 1, request.getExtraProperties().size()); + } + + @Test + public void testExcludeProperties() throws Exception { + final CoordinatorClient mockClient = setupMockedRssShuffleUtils(SUCCESS); + SparkConf conf = new SparkConf(); + conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L); + conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3); + conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); + conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId"); + conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002"); + conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name()); + conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true); + // test exclude properties + conf.set( + RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + + RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES.key(), + RssSparkConfig.RSS_ACCESS_ID + .key() + .substring(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX.length())); + final int confInitKeyCount = conf.getAll().length; + assertCreateRssShuffleManager(conf); + + ArgumentCaptor argumentCaptor = + ArgumentCaptor.forClass(RssAccessClusterRequest.class); + + verify(mockClient).accessCluster(argumentCaptor.capture()); + RssAccessClusterRequest request = argumentCaptor.getValue(); + // all accessId and extra one except the excluded one + assertEquals(confInitKeyCount + 1 - 1, request.getExtraProperties().size()); + } + private void assertCreateSortShuffleManager(SparkConf conf) throws Exception { DelegationRssShuffleManager delegationRssShuffleManager = new DelegationRssShuffleManager(conf, true); diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java index 09cf24cf88..4b7cb62026 100644 --- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java +++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java @@ -56,7 +56,7 @@ protected CoordinatorClient createCoordinatorClient(StatusCode status) { return mockedCoordinatorClient; } - void setupMockedRssShuffleUtils(StatusCode status) { + CoordinatorClient setupMockedRssShuffleUtils(StatusCode status) { CoordinatorClient mockCoordinatorClient = createCoordinatorClient(status); List coordinatorClients = Lists.newArrayList(); coordinatorClients.add(mockCoordinatorClient); @@ -65,5 +65,6 @@ void setupMockedRssShuffleUtils(StatusCode status) { mockedStaticRssShuffleUtils .when(() -> RssSparkShuffleUtils.createCoordinatorClients(any())) .thenReturn(client); + return mockCoordinatorClient; } } diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java index 79b41afcc5..36d80b6e57 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java @@ -310,4 +310,11 @@ public class RssClientConf { .asList() .defaultValues() .withDescription("the report exclude properties could be configured by this option"); + + public static final ConfigOption> RSS_CLIENT_REPORT_INCLUDE_PROPERTIES = + ConfigOptions.key("rss.client.reportIncludeProperties") + .stringType() + .asList() + .noDefaultValue() + .withDescription("the report include properties could be configured by this option"); } diff --git a/docs/client_guide/client_guide.md b/docs/client_guide/client_guide.md index 3715289311..4092df4540 100644 --- a/docs/client_guide/client_guide.md +++ b/docs/client_guide/client_guide.md @@ -60,6 +60,8 @@ The important configuration of client is listed as following. These configuratio | .rss.client.rpc.netty.maxOrder | 3 | The value of maxOrder for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. | | .rss.client.rpc.netty.smallCacheSize | 1024 | The value of smallCacheSize for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. | | .rss.client.blockIdManagerClass | - | The block id manager class of server for this application, the implementation of this interface to manage the shuffle block ids | +| .rss.client.reportExcludeProperties | - | The value of exclude properties specify a list of client configuration properties that should not be reported to the coordinator by the DelegationRssShuffleManager. | +| .rss.client.reportIncludeProperties | - | The value of include properties specify a list of client configuration properties that should be exclusively reported to the coordinator by the DelegationRssShuffleManager. | Notice: