diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/service/ExternalResourceService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/service/ExternalResourceService.java index b75e077823..0516843f71 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/service/ExternalResourceService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/service/ExternalResourceService.java @@ -50,6 +50,12 @@ List getAppInfo( ExternalResourceIdentifier identifier) throws RMErrorException; + Map getBatchResource( + ResourceType resourceType, + RMLabelContainer labelContainer, + List identifiers) + throws RMErrorException; + ExternalResourceProvider chooseProvider( ResourceType resourceType, RMLabelContainer labelContainer) throws RMErrorException; } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/service/impl/ExternalResourceServiceImpl.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/service/impl/ExternalResourceServiceImpl.java index bd6eea8e80..1c7dcad229 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/service/impl/ExternalResourceServiceImpl.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/service/impl/ExternalResourceServiceImpl.java @@ -18,6 +18,7 @@ package org.apache.linkis.manager.rm.external.service.impl; import org.apache.linkis.manager.common.conf.RMConfiguration; +import org.apache.linkis.manager.common.entity.resource.CommonNodeResource; import org.apache.linkis.manager.common.entity.resource.NodeResource; import org.apache.linkis.manager.common.entity.resource.ResourceType; import org.apache.linkis.manager.common.exception.RMErrorException; @@ -33,6 +34,8 @@ import org.apache.linkis.manager.rm.external.parser.YarnResourceIdentifierParser; import org.apache.linkis.manager.rm.external.request.ExternalResourceRequester; import org.apache.linkis.manager.rm.external.service.ExternalResourceService; +import org.apache.linkis.manager.rm.external.yarn.YarnQueueInfo; +import org.apache.linkis.manager.rm.external.yarn.YarnResourceIdentifier; import org.apache.linkis.manager.rm.external.yarn.YarnResourceRequester; import org.apache.linkis.manager.rm.utils.RMUtils; @@ -44,11 +47,13 @@ import java.net.ConnectException; import java.text.MessageFormat; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.stream.Collectors; import com.fasterxml.jackson.core.JsonParseException; import com.google.common.cache.CacheBuilder; @@ -141,6 +146,60 @@ public List getAppInfo( return appInfos; } + @Override + public Map getBatchResource( + ResourceType resourceType, + RMLabelContainer labelContainer, + List identifiers) + throws RMErrorException { + ExternalResourceProvider provider = chooseProvider(resourceType, labelContainer); + ExternalResourceRequester externalResourceRequester = getRequester(resourceType); + + if (externalResourceRequester instanceof YarnResourceRequester) { + YarnResourceRequester yarnRequester = (YarnResourceRequester) externalResourceRequester; + List queueNames = + identifiers.stream() + .map(id -> ((YarnResourceIdentifier) id).getQueueName()) + .collect(Collectors.toList()); + + Map batchResources = + (Map) + retry( + RMConfiguration.EXTERNAL_RETRY_NUM.getValue(), + (i) -> + yarnRequester.getBatchResources( + yarnRequester.getAndUpdateActiveRmWebAddress(provider), + queueNames, + provider), + (i) -> yarnRequester.reloadExternalResourceAddress(provider)); + + Map result = new HashMap<>(); + batchResources.forEach( + (queueName, queueInfo) -> { + CommonNodeResource nodeResource = new CommonNodeResource(); + nodeResource.setMaxResource(queueInfo.getMaxResource()); + nodeResource.setUsedResource(queueInfo.getUsedResource()); + nodeResource.setMaxApps(queueInfo.getMaxApps()); + nodeResource.setNumPendingApps(queueInfo.getNumPendingApps()); + nodeResource.setNumActiveApps(queueInfo.getNumActiveApps()); + result.put(queueName, nodeResource); + }); + return result; + } else { + // For other resource types, fall back to individual requests + Map result = new HashMap<>(); + for (ExternalResourceIdentifier identifier : identifiers) { + try { + NodeResource resource = getResource(resourceType, labelContainer, identifier); + result.put(((YarnResourceIdentifier) identifier).getQueueName(), resource); + } catch (Exception e) { + logger.error("Failed to get resource for identifier " + identifier, e); + } + } + return result; + } + } + private Object retry(int retryNum, Function function, Function reloadExternalAddress) throws RMErrorException { int times = 0; diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java index f0cc97a342..110f58e632 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java @@ -218,6 +218,31 @@ public YarnQueueInfo getResources( String queueName, ExternalResourceProvider provider) { JsonNode resp = getResponseByUrl("scheduler", rmWebAddress, provider); + return getResourcesFromResponse(resp, realQueueName, queueName, provider); + } + + public Map getBatchResources( + String rmWebAddress, List queueNames, ExternalResourceProvider provider) { + JsonNode resp = getResponseByUrl("scheduler", rmWebAddress, provider); + Map queueInfoMap = new HashMap<>(); + for (String queueName : queueNames) { + try { + String realQueueName = queuePrefix + queueName; + if (queueName.startsWith(queuePrefix)) { + realQueueName = queueName; + } + YarnQueueInfo queueInfo = + getResourcesFromResponse(resp, realQueueName, queueName, provider); + queueInfoMap.put(queueName, queueInfo); + } catch (Exception e) { + logger.error("Failed to get resource for queue " + queueName, e); + } + } + return queueInfoMap; + } + + private YarnQueueInfo getResourcesFromResponse( + JsonNode resp, String realQueueName, String queueName, ExternalResourceProvider provider) { JsonNode schedulerInfo = resp.path("scheduler").path("schedulerInfo"); String schedulerType = schedulerInfo.path("type").asText(); if ("capacityScheduler".equals(schedulerType)) { @@ -233,7 +258,8 @@ public YarnQueueInfo getResources( } JsonNode queueInfo = queue.get(); return new YarnQueueInfo( - maxEffectiveHandle(queue, rmWebAddress, queueName, provider).get(), + maxEffectiveHandle(queue, getAndUpdateActiveRmWebAddress(provider), queueName, provider) + .get(), getYarnResource(queue.map(node -> node.path("resourcesUsed")), queueName).get(), queueInfo.path("maxApps").asInt(), queueInfo.path("numPendingApps").asInt(), @@ -323,7 +349,8 @@ public List requestAppInfo( + "&states=" + YarnAppState.RUNNING.getState() + "," - + YarnAppState.ACCEPTED.getState(); + + YarnAppState.ACCEPTED.getState() + + RMConfiguration.YARN_APPS_FILTER_PARMS.getValue(); resp = getResponseByUrl("apps" + queryParams, rmWebAddress, provider).path("apps").path("app"); } else { diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala index a737884f17..7bf1acca23 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala @@ -52,6 +52,7 @@ import org.apache.linkis.manager.persistence.{ ResourceManagerPersistence } import org.apache.linkis.manager.rm.domain.RMLabelContainer +import org.apache.linkis.manager.rm.external.domain.ExternalResourceIdentifier import org.apache.linkis.manager.rm.external.service.ExternalResourceService import org.apache.linkis.manager.rm.external.yarn.{YarnAppInfo, YarnResourceIdentifier} import org.apache.linkis.manager.rm.restful.vo.{UserCreatorEngineType, UserResourceVo} @@ -574,6 +575,174 @@ class RMMonitorRest extends Logging { appendMessageData(message, "queues", clusters) } + @ApiOperation(value = "getBatchQueueResource", notes = "get batch queue resource") + @RequestMapping(path = Array("batchqueueresources"), method = Array(RequestMethod.POST)) + def getBatchQueueResource( + request: HttpServletRequest, + @RequestBody param: util.Map[String, AnyRef] + ): Message = { + ModuleUserUtils.getOperationUser(request, "getBatchQueueResource") + val message = Message.ok("") + val queueNamesParam = param.get("queueNames") + if (queueNamesParam == null) { + return Message.error("queueNames parameter is required") + } + val queueNames = queueNamesParam match { + case list: java.util.List[_] => + list.asScala.map(_.toString.trim).filter(StringUtils.isNotBlank).toArray + case array: Array[_] => + array.map(_.toString.trim).filter(StringUtils.isNotBlank) + case _ => + return Message.error("queueNames parameter must be an array or comma-separated string") + } + if (queueNames.isEmpty) { + return Message.error("queueNames parameter is empty") + } + var clustername = param.get("clustername").asInstanceOf[String] + val crossCluster = java.lang.Boolean.parseBoolean( + param.getOrDefault("crossCluster", "false").asInstanceOf[String] + ) + if (crossCluster) { + clustername = AMConfiguration.PRIORITY_CLUSTER_TARGET + } + val clusterLabel = labelFactory.createLabel(classOf[ClusterLabel]) + clusterLabel.setClusterName(clustername) + clusterLabel.setClusterType(param.get("clustertype").asInstanceOf[String]) + val labelContainer = new RMLabelContainer(Lists.newArrayList(clusterLabel)) + val queueInfoMap = new mutable.HashMap[String, AnyRef]() + + try { + // Process queue names and create identifiers + import java.util.ArrayList + val identifiers = new ArrayList[ExternalResourceIdentifier]() + queueNames.foreach { queueName => + var processedQueueName = queueName + if ( + StringUtils.isNotBlank(processedQueueName) && processedQueueName.startsWith(queuePrefix) + ) { + logger.info( + "Queue name {} starts with '{}', remove '{}'", + processedQueueName, + queuePrefix, + queuePrefix + ) + processedQueueName = processedQueueName.substring(queuePrefix.length) + } + identifiers.add(new YarnResourceIdentifier(processedQueueName)) + } + + // Use batch API to get all queue resources at once + val batchResources = + externalResourceService.getBatchResource(ResourceType.Yarn, labelContainer, identifiers) + + // Process the results + import scala.collection.JavaConverters._ + batchResources.asScala.foreach { case (queueName, nodeResource) => + ( + nodeResource.getMaxResource.asInstanceOf[YarnResource], + nodeResource.getUsedResource.asInstanceOf[YarnResource] + ) match { + case (maxResource, usedResource) => + val queueInfo = new mutable.HashMap[String, AnyRef]() + queueInfo.put("queuename", maxResource) + queueInfo.put( + "maxResources", + Map( + "memory" -> maxResource.getQueueMemory, + "cores" -> maxResource.getQueueCores + ).asJava + ) + queueInfo.put( + "usedResources", + Map( + "memory" -> usedResource.getQueueMemory, + "cores" -> usedResource.getQueueCores + ).asJava + ) + val usedMemoryPercentage = usedResource.getQueueMemory + .asInstanceOf[Double] / maxResource.getQueueMemory.asInstanceOf[Double] + val usedCPUPercentage = usedResource.getQueueCores + .asInstanceOf[Double] / maxResource.getQueueCores.asInstanceOf[Double] + queueInfo.put( + "usedPercentage", + Map("memory" -> usedMemoryPercentage, "cores" -> usedCPUPercentage).asJava + ) + queueInfo.put("maxApps", nodeResource.getMaxApps.asInstanceOf[AnyRef]) + queueInfo.put("numActiveApps", nodeResource.getNumActiveApps.asInstanceOf[AnyRef]) + queueInfo.put("numPendingApps", nodeResource.getNumPendingApps.asInstanceOf[AnyRef]) + queueInfoMap.put(queueName, queueInfo.asJava) + case _ => + logger.warn(s"Failed to get queue resource for $queueName") + } + } + } catch { + case e: Exception => + logger.error("Failed to get batch queue resources", e) + // Fall back to individual requests if batch API fails + queueNames.foreach { queueName => + try { + var processedQueueName = queueName + if ( + StringUtils + .isNotBlank(processedQueueName) && processedQueueName.startsWith(queuePrefix) + ) { + processedQueueName = processedQueueName.substring(queuePrefix.length) + } + val yarnIdentifier = new YarnResourceIdentifier(processedQueueName) + val providedYarnResource = + externalResourceService.getResource(ResourceType.Yarn, labelContainer, yarnIdentifier) + ( + providedYarnResource.getMaxResource.asInstanceOf[YarnResource], + providedYarnResource.getUsedResource.asInstanceOf[YarnResource] + ) match { + case (maxResource, usedResource) => + val queueInfo = new mutable.HashMap[String, AnyRef]() + queueInfo.put( + "maxResources", + Map( + "memory" -> maxResource.getQueueMemory, + "cores" -> maxResource.getQueueCores + ).asJava + ) + queueInfo.put( + "usedResources", + Map( + "memory" -> usedResource.getQueueMemory, + "cores" -> usedResource.getQueueCores + ).asJava + ) + val usedMemoryPercentage = usedResource.getQueueMemory + .asInstanceOf[Double] / maxResource.getQueueMemory.asInstanceOf[Double] + val usedCPUPercentage = usedResource.getQueueCores + .asInstanceOf[Double] / maxResource.getQueueCores.asInstanceOf[Double] + queueInfo.put( + "usedPercentage", + Map("memory" -> usedMemoryPercentage, "cores" -> usedCPUPercentage).asJava + ) + queueInfo.put("maxApps", providedYarnResource.getMaxApps.asInstanceOf[AnyRef]) + queueInfo.put( + "numActiveApps", + providedYarnResource.getNumActiveApps.asInstanceOf[AnyRef] + ) + queueInfo.put( + "numPendingApps", + providedYarnResource.getNumPendingApps.asInstanceOf[AnyRef] + ) + queueInfoMap.put(queueName, queueInfo.asJava) + case _ => + logger.warn(s"Failed to get queue resource for $queueName") + } + } catch { + case ex: Exception => + logger.error(s"Failed to get queue resource for $queueName", ex) + } + } + } + + appendMessageData(message, "queueInfos", queueInfoMap.asJava) + message + } + private def getEngineNodesByUserList( userList: List[String], withResource: Boolean = false diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/conf/RMConfiguration.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/conf/RMConfiguration.java index e8f9f530eb..78065d7b4b 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/conf/RMConfiguration.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/conf/RMConfiguration.java @@ -91,4 +91,9 @@ public class RMConfiguration { public static final CommonVars YARN_APPS_FILTER_ENABLED = CommonVars.apply("wds.linkis.rm.yarn.apps.filter.enabled", true); + + public static final CommonVars YARN_APPS_FILTER_PARMS = + CommonVars.apply( + "wds.linkis.rm.yarn.apps.filter.parms", + "&deSelects=resourceRequests,timeouts,appNodeLabelExpression,amNodeLabelExpression,resourceInfo"); }