|
| 1 | +#' @title ClusterFunctions for HyperQueue |
| 2 | +#' |
| 3 | +#' @description |
| 4 | +#' Cluster functions for HyperQueue (\url{https://it4innovations.github.io/hyperqueue/stable/}). |
| 5 | +#' |
| 6 | +#' Jobs are submitted via the HyperQueue CLI using \code{hq submit} and executed by calling \code{Rscript -e "batchtools::doJobCollection(...)"}. |
| 7 | +#' The job name is set to the job hash and logs are handled internally by batchtools. |
| 8 | +#' Listing jobs uses \code{hq job list} and cancelling jobs uses \code{hq job cancel}. |
| 9 | +#' A running HyperQueue server and workers are required. |
| 10 | +#' |
| 11 | +#' |
| 12 | +#' @inheritParams makeClusterFunctions |
| 13 | +#' @return [ClusterFunctions]. |
| 14 | +#' @family ClusterFunctions |
| 15 | +#' @export |
| 16 | +makeClusterFunctionsHyperQueue = function(scheduler.latency = 1, fs.latency = 65) { |
| 17 | + submitJob = function(reg, jc) { |
| 18 | + assertRegistry(reg, writeable = TRUE) |
| 19 | + assertClass(jc, "JobCollection") |
| 20 | + |
| 21 | + ncpus = if (!is.null(jc$resources$ncpus)) sprintf("--cpus=%i", jc$resources$ncpus) |
| 22 | + memory = if (!is.null(jc$resources$memory)) sprintf("--resource mem=%iMiB", jc$resources$memory) |
| 23 | + # time-limit is the maximum time the job can run, time-request is the minimum remaining lifetime a worker must have |
| 24 | + walltime = if (!is.null(jc$resources$walltime)) sprintf("--time-limit=%is --time-request=%is", jc$resources$walltime, jc$resources$walltime) |
| 25 | + |
| 26 | + |
| 27 | + args = c( |
| 28 | + "submit", |
| 29 | + sprintf("--name=%s", jc$job.hash), |
| 30 | + # hyperqueue cannot write stdout and stderr to the same file |
| 31 | + "--stdout=none", |
| 32 | + "--stderr=none", |
| 33 | + ncpus, |
| 34 | + memory, |
| 35 | + walltime, |
| 36 | + "--", |
| 37 | + "Rscript", "-e", |
| 38 | + shQuote(sprintf("batchtools::doJobCollection('%s', '%s')", jc$uri, jc$log.file)) |
| 39 | + ) |
| 40 | + res = runOSCommand("hq", args) |
| 41 | + if (res$exit.code > 0L) { |
| 42 | + return(cfHandleUnknownSubmitError("hq", res$exit.code, res$output)) |
| 43 | + } |
| 44 | + batch_ids = sub(".*job ID: ([0-9]+).*", "\\1", res$output) |
| 45 | + makeSubmitJobResult(status = 0L, batch.id = batch_ids) |
| 46 | + } |
| 47 | + |
| 48 | + killJob = function(reg, batch.id) { |
| 49 | + assertRegistry(reg, writeable = TRUE) |
| 50 | + assertString(batch.id) |
| 51 | + args = c("job", "cancel", batch.id) |
| 52 | + res = runOSCommand("hq", args) |
| 53 | + if (res$exit.code > 0L) { |
| 54 | + OSError("Killing of job failed", res) |
| 55 | + } |
| 56 | + makeSubmitJobResult(status = 0L, batch.id = batch.id) |
| 57 | + } |
| 58 | + |
| 59 | + |
| 60 | + listJobsQueued = function(reg) { |
| 61 | + requireNamespace("jsonlite") |
| 62 | + assertRegistry(reg, writeable = FALSE) |
| 63 | + args = c("job", "list", "--filter", "waiting", "--output-mode", "json") |
| 64 | + res = runOSCommand("hq", args) |
| 65 | + if (res$exit.code > 0L) { |
| 66 | + OSError("Listing of jobs failed", res) |
| 67 | + } |
| 68 | + jobs = jsonlite::fromJSON(res$output) |
| 69 | + as.character(jobs$id) |
| 70 | + } |
| 71 | + |
| 72 | + listJobsRunning = function(reg) { |
| 73 | + requireNamespace("jsonlite") |
| 74 | + assertRegistry(reg, writeable = FALSE) |
| 75 | + args = c("job", "list", "--filter", "running", "--output-mode", "json") |
| 76 | + res = runOSCommand("hq", args) |
| 77 | + if (res$exit.code > 0L) { |
| 78 | + OSError("Listing of jobs failed", res) |
| 79 | + } |
| 80 | + jobs = jsonlite::fromJSON(res$output) |
| 81 | + as.character(jobs$id) |
| 82 | + } |
| 83 | + |
| 84 | + makeClusterFunctions( |
| 85 | + name = "HyperQueue", |
| 86 | + submitJob = submitJob, |
| 87 | + killJob = killJob, |
| 88 | + listJobsRunning = listJobsRunning, |
| 89 | + listJobsQueued = listJobsQueued, |
| 90 | + store.job.collection = TRUE, |
| 91 | + scheduler.latency = scheduler.latency, |
| 92 | + fs.latency = fs.latency) |
| 93 | +} |
0 commit comments