Skip to content

Removing workaround to get ARC CE Status #8354

@aldbr

Description

@aldbr

Problem is explained in https://bugzilla.nordugrid.org/show_bug.cgi?id=4279
In summary, there is a discrepancy when getting CE details. Depending on the ARC instance, we sometimes get a list of dict with 1 element, sometimes the dict directly.
This is a known issue that was patched, but ARC instances need to be updated.

In the meantime, we set up a temporary workaround: #8325
From time to time, communities should run the following program to check whether the workaround is still needed:

import concurrent.futures

import requests
import typer

import DIRAC
from DIRAC import gLogger, S_ERROR, S_OK
from DIRAC.ConfigurationSystem.Client.Helpers import Operations, Registry
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getQueues
from DIRAC.Core.Base.Script import Script
from DIRAC.Core.Security.ProxyInfo import getVOfromProxyGroup
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager
from DIRAC.Resources.Computing.ComputingElement import ComputingElement
from DIRAC.Resources.Computing.AREXComputingElement import AREXComputingElement
from DIRAC.WorkloadManagementSystem.Client.PilotScopes import PILOT_SCOPES



class AREXFixComputingElement(AREXComputingElement):

    def getCEStatus(self):
        result = self._checkSession()
        if not result["OK"]:
            self.log.error("Cannot get CE Status", result["Message"])
            return result

        # Find out which VO we are running for.
        # Essential now for REST interface.
        result = getVOfromProxyGroup()
        if not result["OK"]:
            return result
        if not result["Value"]:
            return S_ERROR("Could not get VO value from the proxy group")
        vo = result["Value"]

        # Prepare the command
        params = {"schema": "glue2"}
        query = self._urlJoin("info")

        # Submit the GET request
        result = self._request("get", query, params=params)
        if not result["OK"]:
            self.log.error("Failed getting the status of the CE.", result["Message"])
            return S_ERROR("Failed getting the status of the CE")
        response = result["Value"]
        try:
            ceData = response.json()
        except requests.JSONDecodeError:
            self.log.exception("Failed decoding the status of the CE")
            return S_ERROR(f"Failed decoding the status of the CE")

        # Look only in the relevant section out of the headache
        try:
            queueInfo = ceData["Domains"]["AdminDomain"][0]["Services"]["ComputingService"][0]["ComputingShare"]
            if not isinstance(queueInfo, list):
                queueInfo = [queueInfo]
        except KeyError:
            return S_ERROR("Failed extracting the queue information from the CE status")

        # I have only seen the VO published in lower case ...
        result = S_OK()
        result["SubmittedJobs"] = 0

        magic = self.queue + "_" + vo.lower()
        for qi in queueInfo:
            if qi["ID"].endswith(magic):
                result["RunningJobs"] = int(qi["RunningJobs"])
                result["WaitingJobs"] = int(qi["WaitingJobs"]) + int(qi["StagingJobs"]) + int(qi["PreLRMSWaitingJobs"])
                break  # Pick the first (should be only ...) matching queue + VO
        else:
            return S_ERROR(f"Could not find the queue {self.queue} associated to VO {vo}")

        return result


# =============================================================================

app = typer.Typer(help="Test the interactions with a given set of Computing Elements (CE)")


def findGenericCreds(vo: str):
    """
    Find the generic pilot credentials for the given VO.
    :param vo: The Virtual Organization to use for credentials.
    :return: A tuple containing the pilot DN and group.
    """
    opsHelper = Operations.Operations(vo=vo)

    pilotGroup = opsHelper.getValue("Pilot/GenericPilotGroup", "")
    pilotDN = opsHelper.getValue("Pilot/GenericPilotDN", "")
    if not pilotDN:
        pilotUser = opsHelper.getValue("Pilot/GenericPilotUser", "")
        if pilotUser:
            result = Registry.getDNForUsername(pilotUser)
            if result["OK"]:
                pilotDN = result["Value"][0]
    return pilotDN, pilotGroup

def getCredentials(pilotDN: str, pilotGroup: str, ce: ComputingElement):
    """
    Get the pilot credentials for the dn/group.
    :return: A tuple containing the proxy and the token.
    """
    # Get the pilot proxy from the ProxyManager
    result = gProxyManager.getPilotProxyFromDIRACGroup(pilotDN, pilotGroup, 3600)
    if not result["OK"]:
        gLogger.error("Cannot get pilot proxy", result["Message"])
        return None, None
    proxy = result["Value"]

    # Get the pilot token from the TokenManager
    result = gTokenManager.getToken(
        userGroup=pilotGroup,
        scope=PILOT_SCOPES,
        audience=ce.audienceName,
        requiredTimeLeft=1200,
    )
    if not result["OK"]:
        gLogger.error("Cannot get pilot token", result["Message"])
        return None, None
    token = result["Value"]

    return proxy, token


def buildQueues(vo: str) -> dict:
    """
    Get the list of queues for the given community.
    :param vo: The Virtual Organization to use for credentials.
    :return: A dictionary containing the queues for the given parameters.
    """
    result = getQueues(
        community=vo,
        ceTypeList=["AREX"],
    )
    if not result["OK"]:
        gLogger.error("Cannot get queues", result["Message"])
        return {}

    arex_ces = {}
    for _, details in result["Value"].items():
        for ce_name, ce_params in details.items():
                arex_ces[ce_name] = ce_params

    result_ces = {}
    for ce_name, ce_params in arex_ces.items():
        result_ce = AREXFixComputingElement(ce_name)
        queues = ce_params.get("Queues", {})
        ce_params["CEQueueName"] = next(iter(queues))
        ce_params["Queue"] = ce_params["CEQueueName"]
        result_ce.setParameters(ce_params)
        result_ces[ce_name] = result_ce

    return result_ces


def interactWithCE(ce: ComputingElement):
    """
    Interact with a given Computing Element (CE).
    :param ceName: The name of the CE.
    :param port: The port of the CE.
    :param vo: The Virtual Organization to use for credentials.
    """
    checks = {
        "ce_status": {"OK": False, "Message": ""},
    }

    # Get CE Status
    gLogger.info("[%s]Getting CE status" % ce.ceName)
    result = ce.getCEStatus()
    if not result["OK"] and ce.ceType != "HTCondor":
        gLogger.error("[%s]Cannot get CE status: %s" % (ce.ceName, result["Message"]))
        checks["ce_status"]["Message"] = result["Message"]
        return checks
    checks["ce_status"]["OK"] = True
    
    return checks


@app.command()
def main(
    vo: str = typer.Argument(help="Select Virtual Organization"),
    log_level: str = typer.Option("INFO", "--log-level", help="Set the log level (DEBUG, VERBOSE, INFO)"),
):
    """Test the interactions with a given set of Computing Elements (CE)."""
    Script.initialize()

    if log_level:
        gLogger.setLevel(log_level.upper())
        # If you set a log level for a specific backend and want more details to debug
        # then uncomment the next line
        #gLogger._backendsList[0]._handler.setLevel(log_level.upper())
    
    # Get credentials for the given VO
    pilotDN, pilotGroup = findGenericCreds(vo)
    if not pilotDN or not pilotGroup:
        gLogger.error("Cannot get pilot credentials")
        DIRAC.exit(1)

    # Get the queues
    ceDict = buildQueues(
        vo=vo,
    )
    if not ceDict:
        gLogger.error("Cannot get queues")
        DIRAC.exit(1)
    
    # Prepare to interact with each CE
    overallState = {}
    def process_queue(ceName):
        ce = ceDict[ceName]
        if ce.ceType != "SSH":
            gLogger.info(f"Getting creds for CE: {ce.ceName} ({ce.ceType})")
            proxy, token = getCredentials(pilotDN, pilotGroup, ce)
            if not proxy or not token:
                DIRAC.exit(1)
            ce.setProxy(proxy)
            ce.setToken(token)
        gLogger.info(f"Interacting with CE: {ce.ceName} ({ce.ceType})")
        return ceName, interactWithCE(ce)

    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = executor.map(process_queue, list(ceDict.keys()))
        for ceName, state in results:
            overallState[ceName] = state

    gLogger.info("Overall interaction state:")
    total = len(overallState)
    # human-friendly names for each check
    pretty = {
        "ce_status":  "reported CE status",
    }

    for check in pretty:
        okCount = sum(1 for queueState in overallState.values() if queueState[check]["OK"])
        issueCount = total - okCount
        pct = int(okCount / total * 100) if total else 0
        typer.echo(f"- {pct}% of the queues correctly {pretty[check]}. "
                   f"Issues with {issueCount} queue(s):")
        for qname, qState in overallState.items():
            if not qState[check]["OK"]:
                msg = qState[check]["Message"] or "unknown error"
                typer.echo(f"  - {qname}: {msg}")
        typer.echo("")
            


if __name__ == "__main__":
    app()

When the ARC instances will be up to date, we can adapt it: if Failed extracting the queue information from the CE status does not appear anymore.

Note: you need to run this script from a DIRAC client as an admin.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions