diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java index c6d34b5b41..2feebcf6e3 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java @@ -40,6 +40,7 @@ public class AccessManager { private final CoordinatorConf coordinatorConf; private final ClusterManager clusterManager; private final QuotaManager quotaManager; + private final BannedManager bannedManager; private final Configuration hadoopConf; private List accessCheckers = Lists.newArrayList(); @@ -53,6 +54,7 @@ public AccessManager( this.clusterManager = clusterManager; this.hadoopConf = hadoopConf; this.quotaManager = quotaManager; + this.bannedManager = new BannedManager(coordinatorConf); init(); } @@ -103,6 +105,10 @@ public QuotaManager getQuotaManager() { return quotaManager; } + public BannedManager getBannedManager() { + return bannedManager; + } + public void close() throws IOException { for (AccessChecker checker : accessCheckers) { checker.close(); diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/BannedManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/BannedManager.java new file mode 100644 index 0000000000..24049d2abf --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/BannedManager.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator; + +import java.util.Collections; +import java.util.Set; + +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** BannedManager is a manager for ban the abnormal app. */ +public class BannedManager { + private static final Logger LOG = LoggerFactory.getLogger(BannedManager.class); + // version, bannedIds + private volatile Pair> bannedInfo = Pair.of("0", Collections.emptySet()); + + public BannedManager(CoordinatorConf conf) { + LOG.info("BannedManager initialized successfully."); + } + + public boolean checkBanned(String id) { + return bannedInfo.getValue().contains(id); + } + + public void reloadBannedIdsFromRest(Pair> newBannedIds) { + if (newBannedIds.getKey().equals(bannedInfo.getKey())) { + LOG.warn("receive bannedIds from rest with the same version: {}", newBannedIds.getKey()); + } + bannedInfo = newBannedIds; + } + + public String getVersion() { + return bannedInfo.getKey(); + } + + public Pair> getBannedInfo() { + return bannedInfo; + } +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java index 5b1f64ff16..933b968e5a 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java @@ -256,6 +256,16 @@ public class CoordinatorConf extends RssBaseConf { .asList() .defaultValues("appHeartbeat", "heartbeat") .withDescription("Exclude record rpc audit operation list, separated by ','"); + public static final ConfigOption COORDINATOR_ACCESS_BANNED_ID_PROVIDER = + ConfigOptions.key("rss.coordinator.access.bannedIdProvider") + .stringType() + .noDefaultValue() + .withDescription("Get the banned id from Access banned id provider "); + public static final ConfigOption COORDINATOR_ACCESS_BANNED_ID_PROVIDER_REG_PATTERN = + ConfigOptions.key("rss.coordinator.access.bannedIdProviderPattern") + .stringType() + .defaultValue("(.*)") + .withDescription("The regular banned id pattern to extract"); public CoordinatorConf() {} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessBannedChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessBannedChecker.java new file mode 100644 index 0000000000..4be3d18627 --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessBannedChecker.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.access.checker; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.ReconfigurableRegistry; +import org.apache.uniffle.common.config.RssConf; +import org.apache.uniffle.common.util.Constants; +import org.apache.uniffle.coordinator.AccessManager; +import org.apache.uniffle.coordinator.CoordinatorConf; +import org.apache.uniffle.coordinator.access.AccessCheckResult; +import org.apache.uniffle.coordinator.access.AccessInfo; +import org.apache.uniffle.coordinator.metric.CoordinatorMetrics; + +/** + * AccessBannedChecker maintain a list of banned id and update it periodically, it checks the banned + * id in the access request and reject if the id is in the banned list. + */ +public class AccessBannedChecker extends AbstractAccessChecker { + private static final Logger LOG = LoggerFactory.getLogger(AccessBannedChecker.class); + private final AccessManager accessManager; + private String bannedIdProviderKey; + private Pattern bannedIdProviderPattern; + + public AccessBannedChecker(AccessManager accessManager) throws Exception { + super(accessManager); + this.accessManager = accessManager; + CoordinatorConf conf = accessManager.getCoordinatorConf(); + bannedIdProviderKey = conf.get(CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER); + updateBannedIdProviderPattern(conf); + + LOG.info( + "Construct BannedChecker. BannedIdProviderKey is {}, pattern is {}", + bannedIdProviderKey, + bannedIdProviderPattern.pattern()); + ReconfigurableRegistry.register( + Sets.newHashSet( + CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER.key(), + CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER_REG_PATTERN.key()), + (theConf, changedProperties) -> { + if (changedProperties == null) { + return; + } + if (changedProperties.contains( + CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER.key())) { + this.bannedIdProviderKey = + conf.get(CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER); + } + if (changedProperties.contains( + CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER.key())) { + updateBannedIdProviderPattern(conf); + } + }); + } + + @Override + public AccessCheckResult check(AccessInfo accessInfo) { + if (accessInfo.getExtraProperties() != null + && bannedIdProviderKey != null + && accessInfo.getExtraProperties().containsKey(bannedIdProviderKey)) { + String bannedIdPropertyValue = accessInfo.getExtraProperties().get(bannedIdProviderKey); + Matcher matcher = bannedIdProviderPattern.matcher(bannedIdPropertyValue); + if (matcher.find()) { + String bannedId = matcher.group(1); + if (accessManager.getBannedManager() != null + && accessManager.getBannedManager().checkBanned(bannedId)) { + String msg = String.format("Denied by BannedChecker, accessInfo[%s].", accessInfo); + if (LOG.isDebugEnabled()) { + LOG.debug("BannedIdPropertyValue is {}, {}", bannedIdPropertyValue, msg); + } + CoordinatorMetrics.counterTotalBannedDeniedRequest.inc(); + return new AccessCheckResult(false, msg); + } + } + } + + return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE); + } + + private void updateBannedIdProviderPattern(RssConf conf) { + String bannedIdProviderRegex = + conf.get(CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER_REG_PATTERN); + bannedIdProviderPattern = Pattern.compile(bannedIdProviderRegex); + } + + @Override + public void close() {} +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java index a97892526e..19fd226c8f 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java @@ -43,6 +43,7 @@ public class CoordinatorMetrics { private static final String TOTAL_CANDIDATES_DENIED_REQUEST = "total_candidates_denied_request"; private static final String TOTAL_LOAD_DENIED_REQUEST = "total_load_denied_request"; private static final String TOTAL_QUOTA_DENIED_REQUEST = "total_quota_denied_request"; + private static final String TOTAL_BANNED_DENIED_REQUEST = "total_banned_denied_request"; public static final String REMOTE_STORAGE_IN_USED_PREFIX = "remote_storage_in_used_"; public static final String APP_NUM_TO_USER = "app_num"; public static final String USER_LABEL = "user_name"; @@ -57,6 +58,7 @@ public class CoordinatorMetrics { public static Counter counterTotalCandidatesDeniedRequest; public static Counter counterTotalQuotaDeniedRequest; public static Counter counterTotalLoadDeniedRequest; + public static Counter counterTotalBannedDeniedRequest; public static final Map GAUGE_USED_REMOTE_STORAGE = JavaUtils.newConcurrentMap(); private static MetricsManager metricsManager; @@ -118,5 +120,6 @@ private static void setUpMetrics() { metricsManager.addCounter(TOTAL_CANDIDATES_DENIED_REQUEST); counterTotalQuotaDeniedRequest = metricsManager.addCounter(TOTAL_QUOTA_DENIED_REQUEST); counterTotalLoadDeniedRequest = metricsManager.addCounter(TOTAL_LOAD_DENIED_REQUEST); + counterTotalBannedDeniedRequest = metricsManager.addCounter(TOTAL_BANNED_DENIED_REQUEST); } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/BannedResource.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/BannedResource.java new file mode 100644 index 0000000000..e6a6f758a0 --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/BannedResource.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.web.resource; + +import java.util.Set; +import javax.servlet.ServletContext; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hbase.thirdparty.javax.ws.rs.Consumes; +import org.apache.hbase.thirdparty.javax.ws.rs.GET; +import org.apache.hbase.thirdparty.javax.ws.rs.POST; +import org.apache.hbase.thirdparty.javax.ws.rs.Path; +import org.apache.hbase.thirdparty.javax.ws.rs.core.Context; +import org.apache.hbase.thirdparty.javax.ws.rs.core.MediaType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.web.resource.BaseResource; +import org.apache.uniffle.common.web.resource.Response; +import org.apache.uniffle.coordinator.AccessManager; +import org.apache.uniffle.coordinator.BannedManager; +import org.apache.uniffle.coordinator.web.vo.BannedReloadVO; + +@Path("/banned") +public class BannedResource extends BaseResource { + private static final Logger LOG = LoggerFactory.getLogger(BannedResource.class); + @Context protected ServletContext servletContext; + + @Consumes(MediaType.APPLICATION_JSON) + @POST + @Path("/reload") + public Response reload(BannedReloadVO bannedReloadVo) { + BannedManager bannedManager = getAccessManager().getBannedManager(); + if (bannedManager != null && bannedReloadVo != null) { + bannedManager.reloadBannedIdsFromRest( + Pair.of(bannedReloadVo.getVersion(), bannedReloadVo.getIds())); + LOG.info("reload {} banned ids.", bannedReloadVo.getIds().size()); + return Response.success("success"); + } else { + return Response.fail("bannedManager is not initialized or bannedIds is null."); + } + } + + @GET + @Path("version") + public Response version() { + BannedManager bannedManager = getAccessManager().getBannedManager(); + if (bannedManager != null) { + String version = bannedManager.getVersion(); + LOG.info("Get version of banned ids is {}.", version); + return Response.success(version); + } else { + return Response.fail("bannedManager is not initialized."); + } + } + + @GET + @Path("get") + public Response>> get() { + BannedManager bannedManager = getAccessManager().getBannedManager(); + if (bannedManager != null) { + Pair> bannedInfo = bannedManager.getBannedInfo(); + LOG.info( + "Get version:{} include {} bannedIds ", + bannedInfo.getKey(), + bannedInfo.getValue().size()); + return Response.success(bannedInfo); + } else { + return Response.fail("bannedManager is not initialized."); + } + } + + private AccessManager getAccessManager() { + return (AccessManager) servletContext.getAttribute(AccessManager.class.getCanonicalName()); + } +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/CoordinatorServerResource.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/CoordinatorServerResource.java index 57822b516b..02ace0afc6 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/CoordinatorServerResource.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/CoordinatorServerResource.java @@ -116,4 +116,9 @@ public String getCoordinatorStacks() { public Class getConfOps() { return ConfOpsResource.class; } + + @Path("/banned") + public Class getBannedResource() { + return BannedResource.class; + } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/vo/BannedReloadVO.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/vo/BannedReloadVO.java new file mode 100644 index 0000000000..eca256ec13 --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/vo/BannedReloadVO.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.web.vo; + +import java.util.Collections; +import java.util.Set; + +public class BannedReloadVO { + private String version; + private Set ids = Collections.emptySet(); + + public String getVersion() { + return version; + } + + public Set getIds() { + return ids; + } + + public void setIds(Set ids) { + if (ids == null) { + ids = Collections.emptySet(); + } + this.ids = ids; + } + + public void setVersion(String version) { + this.version = version; + } + + @Override + public String toString() { + return "BannedIdsVO{" + + "versionId='" + + version + + '\'' + + ", size of bannedIds=" + + ids.size() + + '}'; + } +} diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessBannedCheckerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessBannedCheckerTest.java new file mode 100644 index 0000000000..2eb1a47191 --- /dev/null +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessBannedCheckerTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.checker; + +import java.util.Collections; + +import com.google.common.collect.Sets; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.apache.uniffle.coordinator.AccessManager; +import org.apache.uniffle.coordinator.BannedManager; +import org.apache.uniffle.coordinator.CoordinatorConf; +import org.apache.uniffle.coordinator.access.AccessInfo; +import org.apache.uniffle.coordinator.access.checker.AccessBannedChecker; +import org.apache.uniffle.coordinator.metric.CoordinatorMetrics; + +import static java.lang.Thread.sleep; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class AccessBannedCheckerTest { + + @BeforeEach + public void setUp() { + CoordinatorMetrics.register(); + } + + @AfterEach + public void clear() { + CoordinatorMetrics.clear(); + } + + @Test + public void test() throws Exception { + CoordinatorConf conf = new CoordinatorConf(); + conf.set(CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER, "test.key"); + conf.set(CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER_REG_PATTERN, "(.*)_.*"); + String checkerClassName = AccessBannedChecker.class.getName(); + conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), checkerClassName); + AccessManager accessManager = new AccessManager(conf, null, null, new Configuration()); + BannedManager bannedManager = accessManager.getBannedManager(); + bannedManager.reloadBannedIdsFromRest(Pair.of("version1", Sets.newHashSet("2", "9527", "135"))); + AccessBannedChecker checker = (AccessBannedChecker) accessManager.getAccessCheckers().get(0); + sleep(1200); + assertFalse( + checker + .check( + new AccessInfo( + "DummyAccessId", + Sets.newHashSet(), + Collections.singletonMap("test.key", "9527_1234"), + "")) + .isSuccess()); + assertFalse( + checker + .check( + new AccessInfo( + "DummyAccessId", + Sets.newHashSet(), + Collections.singletonMap("test.key", "135_1234"), + "")) + .isSuccess()); + assertTrue( + checker + .check( + new AccessInfo( + "DummyAccessId", + Sets.newHashSet(), + Collections.singletonMap("test.key", "2"), + "")) + .isSuccess()); + assertTrue( + checker + .check( + new AccessInfo( + "DummyAccessId", + Sets.newHashSet(), + Collections.singletonMap("test.key", "1"), + "")) + .isSuccess()); + assertTrue( + checker + .check( + new AccessInfo( + "DummyAccessId", + Sets.newHashSet(), + Collections.singletonMap("test.key", "1_2"), + "")) + .isSuccess()); + + checker.close(); + } + + @Test + public void testAbnormal() throws Exception { + CoordinatorConf conf = new CoordinatorConf(); + String checkerClassName = AccessBannedChecker.class.getName(); + conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), checkerClassName); + AccessManager accessManager = new AccessManager(conf, null, null, new Configuration()); + AccessBannedChecker checker = (AccessBannedChecker) accessManager.getAccessCheckers().get(0); + sleep(1200); + // any access is passed for default config. + assertTrue( + checker + .check( + new AccessInfo( + "DummyAccessId", + Sets.newHashSet(), + Collections.singletonMap("test.key", "9527_1234"), + "")) + .isSuccess()); + checker.close(); + } +}