Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ List<ExternalAppInfo> getAppInfo(
ExternalResourceIdentifier identifier)
throws RMErrorException;

Map<String, NodeResource> getBatchResource(
ResourceType resourceType,
RMLabelContainer labelContainer,
List<ExternalResourceIdentifier> identifiers)
throws RMErrorException;

ExternalResourceProvider chooseProvider(
ResourceType resourceType, RMLabelContainer labelContainer) throws RMErrorException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.linkis.manager.rm.external.service.impl;

import org.apache.linkis.manager.common.conf.RMConfiguration;
import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
import org.apache.linkis.manager.common.entity.resource.NodeResource;
import org.apache.linkis.manager.common.entity.resource.ResourceType;
import org.apache.linkis.manager.common.exception.RMErrorException;
Expand All @@ -33,6 +34,8 @@
import org.apache.linkis.manager.rm.external.parser.YarnResourceIdentifierParser;
import org.apache.linkis.manager.rm.external.request.ExternalResourceRequester;
import org.apache.linkis.manager.rm.external.service.ExternalResourceService;
import org.apache.linkis.manager.rm.external.yarn.YarnQueueInfo;
import org.apache.linkis.manager.rm.external.yarn.YarnResourceIdentifier;
import org.apache.linkis.manager.rm.external.yarn.YarnResourceRequester;
import org.apache.linkis.manager.rm.utils.RMUtils;

Expand All @@ -44,11 +47,13 @@

import java.net.ConnectException;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import com.fasterxml.jackson.core.JsonParseException;
import com.google.common.cache.CacheBuilder;
Expand Down Expand Up @@ -141,6 +146,60 @@ public List<ExternalAppInfo> getAppInfo(
return appInfos;
}

@Override
public Map<String, NodeResource> getBatchResource(
ResourceType resourceType,
RMLabelContainer labelContainer,
List<ExternalResourceIdentifier> identifiers)
throws RMErrorException {
ExternalResourceProvider provider = chooseProvider(resourceType, labelContainer);
ExternalResourceRequester externalResourceRequester = getRequester(resourceType);

if (externalResourceRequester instanceof YarnResourceRequester) {
YarnResourceRequester yarnRequester = (YarnResourceRequester) externalResourceRequester;
List<String> queueNames =
identifiers.stream()
.map(id -> ((YarnResourceIdentifier) id).getQueueName())
.collect(Collectors.toList());

Map<String, YarnQueueInfo> batchResources =
(Map<String, YarnQueueInfo>)
retry(
RMConfiguration.EXTERNAL_RETRY_NUM.getValue(),
(i) ->
yarnRequester.getBatchResources(
yarnRequester.getAndUpdateActiveRmWebAddress(provider),
queueNames,
provider),
(i) -> yarnRequester.reloadExternalResourceAddress(provider));

Map<String, NodeResource> result = new HashMap<>();
batchResources.forEach(
(queueName, queueInfo) -> {
CommonNodeResource nodeResource = new CommonNodeResource();
nodeResource.setMaxResource(queueInfo.getMaxResource());
nodeResource.setUsedResource(queueInfo.getUsedResource());
nodeResource.setMaxApps(queueInfo.getMaxApps());
nodeResource.setNumPendingApps(queueInfo.getNumPendingApps());
nodeResource.setNumActiveApps(queueInfo.getNumActiveApps());
result.put(queueName, nodeResource);
});
return result;
} else {
// For other resource types, fall back to individual requests
Map<String, NodeResource> result = new HashMap<>();
for (ExternalResourceIdentifier identifier : identifiers) {
try {
NodeResource resource = getResource(resourceType, labelContainer, identifier);
result.put(((YarnResourceIdentifier) identifier).getQueueName(), resource);
} catch (Exception e) {
logger.error("Failed to get resource for identifier " + identifier, e);
}
}
return result;
}
}

private Object retry(int retryNum, Function function, Function reloadExternalAddress)
throws RMErrorException {
int times = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.manager.rm.external.yarn;

/** Enumeration of Yarn application states. */
public enum YarnAppState {

/** Application has been submitted but not yet accepted by ResourceManager. */
NEW("NEW"),

/** Application is being saved to persistent storage. */
NEW_SAVING("NEW_SAVING"),

/** Application has been submitted and is waiting for scheduling. */
SUBMITTED("SUBMITTED"),

/** Application has been accepted by ResourceManager and waiting for resource allocation. */
ACCEPTED("ACCEPTED"),

/** Application is running with at least one container executing. */
RUNNING("RUNNING"),

/** Application has completed successfully. */
FINISHED("FINISHED"),

/** Application execution has failed. */
FAILED("FAILED"),

/** Application has been manually terminated. */
KILLED("KILLED");

private final String state;

YarnAppState(String state) {
this.state = state;
}

public String getState() {
return state;
}

/**
* Check if the state is active (RUNNING or ACCEPTED). These states represent applications that
* are consuming or about to consume cluster resources.
*
* @return true if the state is active
*/
public boolean isActive() {
return this == RUNNING || this == ACCEPTED;
}

/**
* Parse string to YarnAppState enum.
*
* @param state the state string
* @return YarnAppState enum
*/
public static YarnAppState fromString(String state) {
for (YarnAppState appState : YarnAppState.values()) {
if (appState.state.equals(state)) {
return appState;
}
}
throw new IllegalArgumentException("Unknown YarnAppState: " + state);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.linkis.manager.rm.external.yarn;

import org.apache.linkis.engineplugin.server.conf.EngineConnPluginConfiguration;
import org.apache.linkis.manager.common.conf.RMConfiguration;
import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
import org.apache.linkis.manager.common.entity.resource.NodeResource;
import org.apache.linkis.manager.common.entity.resource.ResourceType;
Expand Down Expand Up @@ -217,6 +218,31 @@ public YarnQueueInfo getResources(
String queueName,
ExternalResourceProvider provider) {
JsonNode resp = getResponseByUrl("scheduler", rmWebAddress, provider);
return getResourcesFromResponse(resp, realQueueName, queueName, provider);
}

public Map<String, YarnQueueInfo> getBatchResources(
String rmWebAddress, List<String> queueNames, ExternalResourceProvider provider) {
JsonNode resp = getResponseByUrl("scheduler", rmWebAddress, provider);
Map<String, YarnQueueInfo> queueInfoMap = new HashMap<>();
for (String queueName : queueNames) {
try {
String realQueueName = queuePrefix + queueName;
if (queueName.startsWith(queuePrefix)) {
realQueueName = queueName;
}
YarnQueueInfo queueInfo =
getResourcesFromResponse(resp, realQueueName, queueName, provider);
queueInfoMap.put(queueName, queueInfo);
} catch (Exception e) {
logger.error("Failed to get resource for queue " + queueName, e);
}
}
return queueInfoMap;
}

private YarnQueueInfo getResourcesFromResponse(
JsonNode resp, String realQueueName, String queueName, ExternalResourceProvider provider) {
JsonNode schedulerInfo = resp.path("scheduler").path("schedulerInfo");
String schedulerType = schedulerInfo.path("type").asText();
if ("capacityScheduler".equals(schedulerType)) {
Expand All @@ -232,7 +258,8 @@ public YarnQueueInfo getResources(
}
JsonNode queueInfo = queue.get();
return new YarnQueueInfo(
maxEffectiveHandle(queue, rmWebAddress, queueName, provider).get(),
maxEffectiveHandle(queue, getAndUpdateActiveRmWebAddress(provider), queueName, provider)
.get(),
getYarnResource(queue.map(node -> node.path("resourcesUsed")), queueName).get(),
queueInfo.path("maxApps").asInt(),
queueInfo.path("numPendingApps").asInt(),
Expand Down Expand Up @@ -313,8 +340,23 @@ public List<ExternalAppInfo> requestAppInfo(

String queueName = ((YarnResourceIdentifier) identifier).getQueueName();
String realQueueName = queuePrefix + queueName;
JsonNode resp;
if (RMConfiguration.YARN_APPS_FILTER_ENABLED.getValue()) {
// Build query parameters to filter apps at Yarn API level using active states only
String queryParams =
"?queue="
+ realQueueName
+ "&states="
+ YarnAppState.RUNNING.getState()
+ ","
+ YarnAppState.ACCEPTED.getState();
resp =
getResponseByUrl("apps" + queryParams, rmWebAddress, provider).path("apps").path("app");
} else {
// Fetch all apps without filtering (for backward compatibility)
resp = getResponseByUrl("apps", rmWebAddress, provider).path("apps").path("app");
}

JsonNode resp = getResponseByUrl("apps", rmWebAddress, provider).path("apps").path("app");
if (resp.isMissingNode()) {
return new ArrayList<>();
}
Expand Down
Loading
Loading