From ca7186cbaabb77e4139e11de06da5da7b74a04a3 Mon Sep 17 00:00:00 2001 From: baoloongmao Date: Tue, 26 Nov 2024 20:51:01 +0800 Subject: [PATCH 1/4] Introduce an include property config list to limit report config size --- .../apache/spark/shuffle/DelegationRssShuffleManager.java | 3 +++ .../apache/spark/shuffle/DelegationRssShuffleManager.java | 3 +++ .../org/apache/uniffle/common/config/RssClientConf.java | 7 +++++++ 3 files changed, 13 insertions(+) diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index d2cb4038e6..0750d58d3a 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -133,7 +133,10 @@ private boolean tryAccessCluster() { RssConf rssConf = RssSparkConfig.toRssConf(sparkConf); List excludeProperties = rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES); + List includeProperties = + rssConf.get(RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES); rssConf.getAll().stream() + .filter(entry -> includeProperties.isEmpty() || includeProperties.contains(entry.getKey())) .filter(entry -> !excludeProperties.contains(entry.getKey())) .forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue())); diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index aaa2c10925..34c8e30788 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -137,7 +137,10 @@ private boolean tryAccessCluster() { RssConf rssConf = RssSparkConfig.toRssConf(sparkConf); List excludeProperties = rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES); + List includeProperties = + rssConf.get(RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES); rssConf.getAll().stream() + .filter(entry -> includeProperties.isEmpty() || includeProperties.contains(entry.getKey())) .filter(entry -> !excludeProperties.contains(entry.getKey())) .forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue())); diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java index 79b41afcc5..d1b274bc25 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java @@ -310,4 +310,11 @@ public class RssClientConf { .asList() .defaultValues() .withDescription("the report exclude properties could be configured by this option"); + + public static final ConfigOption> RSS_CLIENT_REPORT_INCLUDE_PROPERTIES = + ConfigOptions.key("rss.client.reportIncludeProperties") + .stringType() + .asList() + .defaultValues() + .withDescription("the report include properties could be configured by this option"); } From 063378c5fadc82198b0813c007267c362e4a8876 Mon Sep 17 00:00:00 2001 From: baoloongmao Date: Tue, 26 Nov 2024 20:57:48 +0800 Subject: [PATCH 2/4] Add documents --- docs/client_guide/client_guide.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/client_guide/client_guide.md b/docs/client_guide/client_guide.md index 3715289311..46642575cc 100644 --- a/docs/client_guide/client_guide.md +++ b/docs/client_guide/client_guide.md @@ -60,6 +60,8 @@ The important configuration of client is listed as following. These configuratio | .rss.client.rpc.netty.maxOrder | 3 | The value of maxOrder for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. | | .rss.client.rpc.netty.smallCacheSize | 1024 | The value of smallCacheSize for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. | | .rss.client.blockIdManagerClass | - | The block id manager class of server for this application, the implementation of this interface to manage the shuffle block ids | +| .rss.client.reportExcludeProperties | - | the report exclude properties could be configured by this option | +| .rss.client.reportIncludeProperties | - | the report include properties could be configured by this option | Notice: From a1fb2fdf90c3b82499ed836b5e11eb72ea25d1fd Mon Sep 17 00:00:00 2001 From: baoloongmao Date: Tue, 3 Dec 2024 00:32:22 +0800 Subject: [PATCH 3/4] Address comment, set no default value and add UT --- .../shuffle/DelegationRssShuffleManager.java | 2 +- .../shuffle/DelegationRssShuffleManager.java | 2 +- .../DelegationRssShuffleManagerTest.java | 86 +++++++++++++++++++ .../shuffle/RssShuffleManagerTestBase.java | 3 +- .../uniffle/common/config/RssClientConf.java | 2 +- 5 files changed, 91 insertions(+), 4 deletions(-) diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index 0750d58d3a..7f2af3ef2b 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -136,7 +136,7 @@ private boolean tryAccessCluster() { List includeProperties = rssConf.get(RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES); rssConf.getAll().stream() - .filter(entry -> includeProperties.isEmpty() || includeProperties.contains(entry.getKey())) + .filter(entry -> includeProperties == null || includeProperties.contains(entry.getKey())) .filter(entry -> !excludeProperties.contains(entry.getKey())) .forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue())); diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index 34c8e30788..0b683834c9 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -140,7 +140,7 @@ private boolean tryAccessCluster() { List includeProperties = rssConf.get(RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES); rssConf.getAll().stream() - .filter(entry -> includeProperties.isEmpty() || includeProperties.contains(entry.getKey())) + .filter(entry -> includeProperties == null || includeProperties.contains(entry.getKey())) .filter(entry -> !excludeProperties.contains(entry.getKey())) .forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue())); diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java index 092869e31d..a38bfad7ba 100644 --- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java +++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java @@ -22,7 +22,11 @@ import org.apache.spark.SparkConf; import org.apache.spark.shuffle.sort.SortShuffleManager; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.apache.uniffle.client.api.CoordinatorClient; +import org.apache.uniffle.client.request.RssAccessClusterRequest; +import org.apache.uniffle.common.config.RssClientConf; import org.apache.uniffle.storage.util.StorageType; import static org.apache.uniffle.common.rpc.StatusCode.ACCESS_DENIED; @@ -30,6 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.verify; public class DelegationRssShuffleManagerTest extends RssShuffleManagerTestBase { @@ -131,6 +136,87 @@ public void testTryAccessCluster() throws Exception { assertCreateSortShuffleManager(secondConf); } + @Test + public void testDefaultIncludeExcludeProperties() throws Exception { + final CoordinatorClient mockClient = setupMockedRssShuffleUtils(SUCCESS); + SparkConf conf = new SparkConf(); + conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L); + conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3); + conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); + conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId"); + conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002"); + conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name()); + conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true); + final int confInitKeyCount = conf.getAll().length; + assertCreateRssShuffleManager(conf); + + // default case: access cluster should include all properties in conf and an extra one. + ArgumentCaptor argumentCaptor = + ArgumentCaptor.forClass(RssAccessClusterRequest.class); + verify(mockClient).accessCluster(argumentCaptor.capture()); + RssAccessClusterRequest request = argumentCaptor.getValue(); + assertEquals(confInitKeyCount + 1, request.getExtraProperties().size()); + } + + @Test + public void testIncludeProperties() throws Exception { + final CoordinatorClient mockClient = setupMockedRssShuffleUtils(SUCCESS); + SparkConf conf = new SparkConf(); + conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L); + conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3); + conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); + conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId"); + conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002"); + conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name()); + conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true); + // test include properties + conf.set( + RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + + RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES.key(), + RssSparkConfig.RSS_ACCESS_ID + .key() + .substring(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX.length())); + assertCreateRssShuffleManager(conf); + + ArgumentCaptor argumentCaptor = + ArgumentCaptor.forClass(RssAccessClusterRequest.class); + + verify(mockClient).accessCluster(argumentCaptor.capture()); + RssAccessClusterRequest request = argumentCaptor.getValue(); + // only accessId and extra one + assertEquals(1 + 1, request.getExtraProperties().size()); + } + + @Test + public void testExcludeProperties() throws Exception { + final CoordinatorClient mockClient = setupMockedRssShuffleUtils(SUCCESS); + SparkConf conf = new SparkConf(); + conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L); + conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3); + conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); + conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId"); + conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002"); + conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name()); + conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true); + // test exclude properties + conf.set( + RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + + RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES.key(), + RssSparkConfig.RSS_ACCESS_ID + .key() + .substring(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX.length())); + final int confInitKeyCount = conf.getAll().length; + assertCreateRssShuffleManager(conf); + + ArgumentCaptor argumentCaptor = + ArgumentCaptor.forClass(RssAccessClusterRequest.class); + + verify(mockClient).accessCluster(argumentCaptor.capture()); + RssAccessClusterRequest request = argumentCaptor.getValue(); + // all accessId and extra one except the excluded one + assertEquals(confInitKeyCount + 1 - 1, request.getExtraProperties().size()); + } + private void assertCreateSortShuffleManager(SparkConf conf) throws Exception { DelegationRssShuffleManager delegationRssShuffleManager = new DelegationRssShuffleManager(conf, true); diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java index 09cf24cf88..4b7cb62026 100644 --- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java +++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java @@ -56,7 +56,7 @@ protected CoordinatorClient createCoordinatorClient(StatusCode status) { return mockedCoordinatorClient; } - void setupMockedRssShuffleUtils(StatusCode status) { + CoordinatorClient setupMockedRssShuffleUtils(StatusCode status) { CoordinatorClient mockCoordinatorClient = createCoordinatorClient(status); List coordinatorClients = Lists.newArrayList(); coordinatorClients.add(mockCoordinatorClient); @@ -65,5 +65,6 @@ void setupMockedRssShuffleUtils(StatusCode status) { mockedStaticRssShuffleUtils .when(() -> RssSparkShuffleUtils.createCoordinatorClients(any())) .thenReturn(client); + return mockCoordinatorClient; } } diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java index d1b274bc25..36d80b6e57 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java @@ -315,6 +315,6 @@ public class RssClientConf { ConfigOptions.key("rss.client.reportIncludeProperties") .stringType() .asList() - .defaultValues() + .noDefaultValue() .withDescription("the report include properties could be configured by this option"); } From 1de03b7ff52b5d68c9da7ea09b335d60b6f050ca Mon Sep 17 00:00:00 2001 From: baoloongmao Date: Fri, 3 Jan 2025 11:35:15 +0800 Subject: [PATCH 4/4] Improve document --- docs/client_guide/client_guide.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/client_guide/client_guide.md b/docs/client_guide/client_guide.md index 46642575cc..4092df4540 100644 --- a/docs/client_guide/client_guide.md +++ b/docs/client_guide/client_guide.md @@ -60,8 +60,8 @@ The important configuration of client is listed as following. These configuratio | .rss.client.rpc.netty.maxOrder | 3 | The value of maxOrder for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. | | .rss.client.rpc.netty.smallCacheSize | 1024 | The value of smallCacheSize for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. | | .rss.client.blockIdManagerClass | - | The block id manager class of server for this application, the implementation of this interface to manage the shuffle block ids | -| .rss.client.reportExcludeProperties | - | the report exclude properties could be configured by this option | -| .rss.client.reportIncludeProperties | - | the report include properties could be configured by this option | +| .rss.client.reportExcludeProperties | - | The value of exclude properties specify a list of client configuration properties that should not be reported to the coordinator by the DelegationRssShuffleManager. | +| .rss.client.reportIncludeProperties | - | The value of include properties specify a list of client configuration properties that should be exclusively reported to the coordinator by the DelegationRssShuffleManager. | Notice: