Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ List<ExternalAppInfo> getAppInfo(
ExternalResourceIdentifier identifier)
throws RMErrorException;

Map<String, NodeResource> getBatchResource(
ResourceType resourceType,
RMLabelContainer labelContainer,
List<ExternalResourceIdentifier> identifiers)
throws RMErrorException;

ExternalResourceProvider chooseProvider(
ResourceType resourceType, RMLabelContainer labelContainer) throws RMErrorException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.linkis.manager.rm.external.service.impl;

import org.apache.linkis.manager.common.conf.RMConfiguration;
import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
import org.apache.linkis.manager.common.entity.resource.NodeResource;
import org.apache.linkis.manager.common.entity.resource.ResourceType;
import org.apache.linkis.manager.common.exception.RMErrorException;
Expand All @@ -33,6 +34,8 @@
import org.apache.linkis.manager.rm.external.parser.YarnResourceIdentifierParser;
import org.apache.linkis.manager.rm.external.request.ExternalResourceRequester;
import org.apache.linkis.manager.rm.external.service.ExternalResourceService;
import org.apache.linkis.manager.rm.external.yarn.YarnQueueInfo;
import org.apache.linkis.manager.rm.external.yarn.YarnResourceIdentifier;
import org.apache.linkis.manager.rm.external.yarn.YarnResourceRequester;
import org.apache.linkis.manager.rm.utils.RMUtils;

Expand All @@ -44,11 +47,13 @@

import java.net.ConnectException;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import com.fasterxml.jackson.core.JsonParseException;
import com.google.common.cache.CacheBuilder;
Expand Down Expand Up @@ -141,6 +146,60 @@ public List<ExternalAppInfo> getAppInfo(
return appInfos;
}

@Override
public Map<String, NodeResource> getBatchResource(
ResourceType resourceType,
RMLabelContainer labelContainer,
List<ExternalResourceIdentifier> identifiers)
throws RMErrorException {
ExternalResourceProvider provider = chooseProvider(resourceType, labelContainer);
ExternalResourceRequester externalResourceRequester = getRequester(resourceType);

if (externalResourceRequester instanceof YarnResourceRequester) {
YarnResourceRequester yarnRequester = (YarnResourceRequester) externalResourceRequester;
List<String> queueNames =
identifiers.stream()
.map(id -> ((YarnResourceIdentifier) id).getQueueName())
.collect(Collectors.toList());

Map<String, YarnQueueInfo> batchResources =
(Map<String, YarnQueueInfo>)
retry(
RMConfiguration.EXTERNAL_RETRY_NUM.getValue(),
(i) ->
yarnRequester.getBatchResources(
yarnRequester.getAndUpdateActiveRmWebAddress(provider),
queueNames,
provider),
(i) -> yarnRequester.reloadExternalResourceAddress(provider));

Map<String, NodeResource> result = new HashMap<>();
batchResources.forEach(
(queueName, queueInfo) -> {
CommonNodeResource nodeResource = new CommonNodeResource();
nodeResource.setMaxResource(queueInfo.getMaxResource());
nodeResource.setUsedResource(queueInfo.getUsedResource());
nodeResource.setMaxApps(queueInfo.getMaxApps());
nodeResource.setNumPendingApps(queueInfo.getNumPendingApps());
nodeResource.setNumActiveApps(queueInfo.getNumActiveApps());
result.put(queueName, nodeResource);
});
return result;
} else {
// For other resource types, fall back to individual requests
Map<String, NodeResource> result = new HashMap<>();
for (ExternalResourceIdentifier identifier : identifiers) {
try {
NodeResource resource = getResource(resourceType, labelContainer, identifier);
result.put(((YarnResourceIdentifier) identifier).getQueueName(), resource);
} catch (Exception e) {
logger.error("Failed to get resource for identifier " + identifier, e);
}
}
return result;
}
}

private Object retry(int retryNum, Function function, Function reloadExternalAddress)
throws RMErrorException {
int times = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,31 @@ public YarnQueueInfo getResources(
String queueName,
ExternalResourceProvider provider) {
JsonNode resp = getResponseByUrl("scheduler", rmWebAddress, provider);
return getResourcesFromResponse(resp, realQueueName, queueName, provider);
}

public Map<String, YarnQueueInfo> getBatchResources(
String rmWebAddress, List<String> queueNames, ExternalResourceProvider provider) {
JsonNode resp = getResponseByUrl("scheduler", rmWebAddress, provider);
Map<String, YarnQueueInfo> queueInfoMap = new HashMap<>();
for (String queueName : queueNames) {
try {
String realQueueName = queuePrefix + queueName;
if (queueName.startsWith(queuePrefix)) {
realQueueName = queueName;
}
YarnQueueInfo queueInfo =
getResourcesFromResponse(resp, realQueueName, queueName, provider);
queueInfoMap.put(queueName, queueInfo);
} catch (Exception e) {
logger.error("Failed to get resource for queue " + queueName, e);
}
}
return queueInfoMap;
}

private YarnQueueInfo getResourcesFromResponse(
JsonNode resp, String realQueueName, String queueName, ExternalResourceProvider provider) {
JsonNode schedulerInfo = resp.path("scheduler").path("schedulerInfo");
String schedulerType = schedulerInfo.path("type").asText();
if ("capacityScheduler".equals(schedulerType)) {
Expand All @@ -233,7 +258,8 @@ public YarnQueueInfo getResources(
}
JsonNode queueInfo = queue.get();
return new YarnQueueInfo(
maxEffectiveHandle(queue, rmWebAddress, queueName, provider).get(),
maxEffectiveHandle(queue, getAndUpdateActiveRmWebAddress(provider), queueName, provider)
.get(),
getYarnResource(queue.map(node -> node.path("resourcesUsed")), queueName).get(),
queueInfo.path("maxApps").asInt(),
queueInfo.path("numPendingApps").asInt(),
Expand Down Expand Up @@ -323,7 +349,8 @@ public List<ExternalAppInfo> requestAppInfo(
+ "&states="
+ YarnAppState.RUNNING.getState()
+ ","
+ YarnAppState.ACCEPTED.getState();
+ YarnAppState.ACCEPTED.getState()
+ RMConfiguration.YARN_APPS_FILTER_PARMS.getValue();
resp =
getResponseByUrl("apps" + queryParams, rmWebAddress, provider).path("apps").path("app");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import org.apache.linkis.manager.persistence.{
ResourceManagerPersistence
}
import org.apache.linkis.manager.rm.domain.RMLabelContainer
import org.apache.linkis.manager.rm.external.domain.ExternalResourceIdentifier
import org.apache.linkis.manager.rm.external.service.ExternalResourceService
import org.apache.linkis.manager.rm.external.yarn.{YarnAppInfo, YarnResourceIdentifier}
import org.apache.linkis.manager.rm.restful.vo.{UserCreatorEngineType, UserResourceVo}
Expand Down Expand Up @@ -574,6 +575,174 @@ class RMMonitorRest extends Logging {
appendMessageData(message, "queues", clusters)
}

@ApiOperation(value = "getBatchQueueResource", notes = "get batch queue resource")
@RequestMapping(path = Array("batchqueueresources"), method = Array(RequestMethod.POST))
def getBatchQueueResource(
request: HttpServletRequest,
@RequestBody param: util.Map[String, AnyRef]
): Message = {
ModuleUserUtils.getOperationUser(request, "getBatchQueueResource")
val message = Message.ok("")
val queueNamesParam = param.get("queueNames")
if (queueNamesParam == null) {
return Message.error("queueNames parameter is required")
}
val queueNames = queueNamesParam match {
case list: java.util.List[_] =>
list.asScala.map(_.toString.trim).filter(StringUtils.isNotBlank).toArray
case array: Array[_] =>
array.map(_.toString.trim).filter(StringUtils.isNotBlank)
case _ =>
return Message.error("queueNames parameter must be an array or comma-separated string")
}
if (queueNames.isEmpty) {
return Message.error("queueNames parameter is empty")
}
var clustername = param.get("clustername").asInstanceOf[String]
val crossCluster = java.lang.Boolean.parseBoolean(
param.getOrDefault("crossCluster", "false").asInstanceOf[String]
)
if (crossCluster) {
clustername = AMConfiguration.PRIORITY_CLUSTER_TARGET
}
val clusterLabel = labelFactory.createLabel(classOf[ClusterLabel])
clusterLabel.setClusterName(clustername)
clusterLabel.setClusterType(param.get("clustertype").asInstanceOf[String])
val labelContainer = new RMLabelContainer(Lists.newArrayList(clusterLabel))
val queueInfoMap = new mutable.HashMap[String, AnyRef]()

try {
// Process queue names and create identifiers
import java.util.ArrayList
val identifiers = new ArrayList[ExternalResourceIdentifier]()
queueNames.foreach { queueName =>
var processedQueueName = queueName
if (
StringUtils.isNotBlank(processedQueueName) && processedQueueName.startsWith(queuePrefix)
) {
logger.info(
"Queue name {} starts with '{}', remove '{}'",
processedQueueName,
queuePrefix,
queuePrefix
)
processedQueueName = processedQueueName.substring(queuePrefix.length)
}
identifiers.add(new YarnResourceIdentifier(processedQueueName))
}

// Use batch API to get all queue resources at once
val batchResources =
externalResourceService.getBatchResource(ResourceType.Yarn, labelContainer, identifiers)

// Process the results
import scala.collection.JavaConverters._
batchResources.asScala.foreach { case (queueName, nodeResource) =>
(
nodeResource.getMaxResource.asInstanceOf[YarnResource],
nodeResource.getUsedResource.asInstanceOf[YarnResource]
) match {
case (maxResource, usedResource) =>
val queueInfo = new mutable.HashMap[String, AnyRef]()
queueInfo.put("queuename", maxResource)
queueInfo.put(
"maxResources",
Map(
"memory" -> maxResource.getQueueMemory,
"cores" -> maxResource.getQueueCores
).asJava
)
queueInfo.put(
"usedResources",
Map(
"memory" -> usedResource.getQueueMemory,
"cores" -> usedResource.getQueueCores
).asJava
)
val usedMemoryPercentage = usedResource.getQueueMemory
.asInstanceOf[Double] / maxResource.getQueueMemory.asInstanceOf[Double]
val usedCPUPercentage = usedResource.getQueueCores
.asInstanceOf[Double] / maxResource.getQueueCores.asInstanceOf[Double]
queueInfo.put(
"usedPercentage",
Map("memory" -> usedMemoryPercentage, "cores" -> usedCPUPercentage).asJava
)
queueInfo.put("maxApps", nodeResource.getMaxApps.asInstanceOf[AnyRef])
queueInfo.put("numActiveApps", nodeResource.getNumActiveApps.asInstanceOf[AnyRef])
queueInfo.put("numPendingApps", nodeResource.getNumPendingApps.asInstanceOf[AnyRef])
queueInfoMap.put(queueName, queueInfo.asJava)
case _ =>
logger.warn(s"Failed to get queue resource for $queueName")
}
}
} catch {
case e: Exception =>
logger.error("Failed to get batch queue resources", e)
// Fall back to individual requests if batch API fails
queueNames.foreach { queueName =>
try {
var processedQueueName = queueName
if (
StringUtils
.isNotBlank(processedQueueName) && processedQueueName.startsWith(queuePrefix)
) {
processedQueueName = processedQueueName.substring(queuePrefix.length)
}
val yarnIdentifier = new YarnResourceIdentifier(processedQueueName)
val providedYarnResource =
externalResourceService.getResource(ResourceType.Yarn, labelContainer, yarnIdentifier)
(
providedYarnResource.getMaxResource.asInstanceOf[YarnResource],
providedYarnResource.getUsedResource.asInstanceOf[YarnResource]
) match {
case (maxResource, usedResource) =>
val queueInfo = new mutable.HashMap[String, AnyRef]()
queueInfo.put(
"maxResources",
Map(
"memory" -> maxResource.getQueueMemory,
"cores" -> maxResource.getQueueCores
).asJava
)
queueInfo.put(
"usedResources",
Map(
"memory" -> usedResource.getQueueMemory,
"cores" -> usedResource.getQueueCores
).asJava
)
val usedMemoryPercentage = usedResource.getQueueMemory
.asInstanceOf[Double] / maxResource.getQueueMemory.asInstanceOf[Double]
val usedCPUPercentage = usedResource.getQueueCores
.asInstanceOf[Double] / maxResource.getQueueCores.asInstanceOf[Double]
queueInfo.put(
"usedPercentage",
Map("memory" -> usedMemoryPercentage, "cores" -> usedCPUPercentage).asJava
)
queueInfo.put("maxApps", providedYarnResource.getMaxApps.asInstanceOf[AnyRef])
queueInfo.put(
"numActiveApps",
providedYarnResource.getNumActiveApps.asInstanceOf[AnyRef]
)
queueInfo.put(
"numPendingApps",
providedYarnResource.getNumPendingApps.asInstanceOf[AnyRef]
)
queueInfoMap.put(queueName, queueInfo.asJava)
case _ =>
logger.warn(s"Failed to get queue resource for $queueName")
}
} catch {
case ex: Exception =>
logger.error(s"Failed to get queue resource for $queueName", ex)
}
}
}

appendMessageData(message, "queueInfos", queueInfoMap.asJava)
message
}

private def getEngineNodesByUserList(
userList: List[String],
withResource: Boolean = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,9 @@ public class RMConfiguration {

public static final CommonVars<Boolean> YARN_APPS_FILTER_ENABLED =
CommonVars.apply("wds.linkis.rm.yarn.apps.filter.enabled", true);

public static final CommonVars<String> YARN_APPS_FILTER_PARMS =
CommonVars.apply(
"wds.linkis.rm.yarn.apps.filter.parms",
"&deSelects=resourceRequests,timeouts,appNodeLabelExpression,amNodeLabelExpression,resourceInfo");
}
Loading