From 303a46c5527c976f7999db804ef5fd7b16f1e353 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Sun, 15 Jun 2025 16:20:28 +0200 Subject: [PATCH 1/9] Code formatting --- R/aaa-rstudio-detect.R | 16 +++- R/check.R | 36 ++++++-- R/error.R | 6 +- R/hook.R | 4 +- R/load-client.R | 20 +++-- R/r-process.R | 4 +- R/r-session.R | 117 +++++++++++++++++-------- R/result.R | 4 +- R/rscript.R | 8 +- R/script.R | 4 +- R/setup.R | 70 +++++++++++---- R/standalone-errors.R | 109 +++++++++++++++++------ R/test-helpers.R | 12 ++- R/utils.R | 20 +++-- tests/testthat/helper.R | 8 +- tests/testthat/test-archs.R | 32 +++++-- tests/testthat/test-clean-subprocess.R | 12 ++- tests/testthat/test-eval.R | 4 +- tests/testthat/test-libpath.R | 4 +- tests/testthat/test-load-client.R | 4 +- tests/testthat/test-options.R | 6 +- tests/testthat/test-r-session.R | 23 +++-- 22 files changed, 381 insertions(+), 142 deletions(-) diff --git a/R/aaa-rstudio-detect.R b/R/aaa-rstudio-detect.R index 0baa0331..07630094 100644 --- a/R/aaa-rstudio-detect.R +++ b/R/aaa-rstudio-detect.R @@ -51,8 +51,12 @@ rstudio <- local({ } # Cached? - if (clear_cache) data <<- NULL - if (!is.null(data)) return(get_caps(data)) + if (clear_cache) { + data <<- NULL + } + if (!is.null(data)) { + return(get_caps(data)) + } if ( (rspid <- Sys.getenv("RSTUDIO_SESSION_PID")) != "" && @@ -86,7 +90,9 @@ rstudio <- local({ pane <- Sys.getenv("RSTUDIO_CHILD_PROCESS_PANE") # this should not happen, but be defensive and fall back - if (pane == "") return(detect_old(clear_cache)) + if (pane == "") { + return(detect_old(clear_cache)) + } # direct subprocess new$type <- if (rspid == parentpid) { @@ -175,7 +181,9 @@ rstudio <- local({ } installing <- Sys.getenv("R_PACKAGE_DIR", "") - if (cache && installing == "") data <<- new + if (cache && installing == "") { + data <<- new + } get_caps(new) } diff --git a/R/check.R b/R/check.R index fa5d6eb4..94e568aa 100644 --- a/R/check.R +++ b/R/check.R @@ -21,27 +21,45 @@ convert_and_check_my_args <- function(options) { ## Conversions options <- within(options, { - if (has("libpath")) libpath <- as.character(libpath) - if (has("repos")) repos <- repos + if (has("libpath")) { + libpath <- as.character(libpath) + } + if (has("repos")) { + repos <- repos + } if (has("stdout") && !is.null(stdout)) { stdout <- as.character(stdout) } if (has("stderr") && !is.null(stderr)) { stderr <- as.character(stderr) } - if (has("error")) error <- error[1] - if (has("cmdargs")) cmdargs <- as.character(cmdargs) + if (has("error")) { + error <- error[1] + } + if (has("cmdargs")) { + cmdargs <- as.character(cmdargs) + } if (has("timeout") && !inherits(timeout, "difftime")) { timeout <- as.difftime( as.double(timeout), units = "secs" ) } - if (no("wd")) wd <- "." - if (no("echo")) echo <- FALSE - if (no("fail_on_status")) fail_on_status <- FALSE - if (no("tmp_files")) tmp_files <- character() - if (no("package")) package <- FALSE + if (no("wd")) { + wd <- "." + } + if (no("echo")) { + echo <- FALSE + } + if (no("fail_on_status")) { + fail_on_status <- FALSE + } + if (no("tmp_files")) { + tmp_files <- character() + } + if (no("package")) { + package <- FALSE + } if (no("arch")) arch <- "same" }) diff --git a/R/error.R b/R/error.R index 7c243b22..c58f9b28 100644 --- a/R/error.R +++ b/R/error.R @@ -12,7 +12,11 @@ new_callr_crash_error <- function(out, msg = NULL) { error_msg <- paste0( if (out$timeout) "callr timed out" else "callr subprocess failed", - if (!is.null(msg)) paste0(": ", msg) else if (!out$timeout) ":" + if (!is.null(msg)) { + paste0(": ", msg) + } else if (!out$timeout) { + ":" + } ) cond <- new_error(paste(error_msg)) diff --git a/R/hook.R b/R/hook.R index fd91a395..c5241b1b 100644 --- a/R/hook.R +++ b/R/hook.R @@ -2,7 +2,9 @@ common_hook <- function() { substitute( { # This should not happen in a new R session, but just to be safe - while ("tools:callr" %in% search()) detach("tools:callr") + while ("tools:callr" %in% search()) { + detach("tools:callr") + } env <- readRDS(`__envfile__`) do.call("attach", list(env, pos = length(search()), name = "tools:callr")) data <- env$`__callr_data__` diff --git a/R/load-client.R b/R/load-client.R index 8940cafd..703131e2 100644 --- a/R/load-client.R +++ b/R/load-client.R @@ -13,7 +13,9 @@ load_client_lib <- function(sofile = NULL, pxdir = NULL) { paste0("client", ext), package = "processx" ) - if (sofile != "" && file.exists(sofile)) return(sofile) + if (sofile != "" && file.exists(sofile)) { + return(sofile) + } # Try this as well, this is for devtools/pkgload sofile <- system.file( @@ -21,7 +23,9 @@ load_client_lib <- function(sofile = NULL, pxdir = NULL) { paste0("client", ext), package = "processx" ) - if (sofile != "" && file.exists(sofile)) return(sofile) # nocov + if (sofile != "" && file.exists(sofile)) { + return(sofile) + } # nocov # stop() here and not throw(), because this function should be standalone stop("Cannot find client file") @@ -87,13 +91,19 @@ load_client_lib <- function(sofile = NULL, pxdir = NULL) { env$disable_fd_inheritance <- function() mycall(sym_disinh) env$write_fd <- function(fd, data) { - if (is.character(data)) data <- charToRaw(paste0(data, collapse = "")) + if (is.character(data)) { + data <- charToRaw(paste0(data, collapse = "")) + } len <- length(data) repeat { written <- mycall(sym_write, fd, data) len <- len - written - if (len == 0) break - if (written) data <- data[-(1:written)] + if (len == 0) { + break + } + if (written) { + data <- data[-(1:written)] + } Sys.sleep(.1) } } diff --git a/R/r-process.R b/R/r-process.R index 6cc37b77..9a82cd64 100644 --- a/R/r-process.R +++ b/R/r-process.R @@ -79,7 +79,9 @@ rp_init <- function(self, private, super, options) { } rp_get_result <- function(self, private) { - if (self$is_alive()) throw(new_error("Still alive")) + if (self$is_alive()) { + throw(new_error("Still alive")) + } ## This is artificial... out <- list( diff --git a/R/r-session.R b/R/r-session.R index 45bc150e..26419e26 100644 --- a/R/r-session.R +++ b/R/r-session.R @@ -61,15 +61,18 @@ r_session <- R6::R6Class( options = r_session_options(), wait = TRUE, wait_timeout = 3000 - ) rs_init(self, private, super, options, wait, wait_timeout), + ) { + rs_init(self, private, super, options, wait, wait_timeout) + }, #' @description #' Similar to [r()], but runs the function in a permanent background #' R session. It throws an error if the function call generated an #' error in the child process. #' @return The return value of the R expression. - run = function(func, args = list(), package = FALSE) - rs_run(self, private, func, args, package), + run = function(func, args = list(), package = FALSE) { + rs_run(self, private, func, args, package) + }, #' @description #' Similar to `$run()`, but returns the standard output and error of @@ -86,15 +89,17 @@ r_session <- R6::R6Class( #' error thrown in the subprocess. Otherwise it is `NULL`. #' * `code`, `message`: These fields are used by call internally and #' you can ignore them. - run_with_output = function(func, args = list(), package = FALSE) - rs_run_with_output(self, private, func, args, package), + run_with_output = function(func, args = list(), package = FALSE) { + rs_run_with_output(self, private, func, args, package) + }, #' @description #' Starts running a function in the background R session, and #' returns immediately. To check if the function is done, call the #' `poll_process()` method. - call = function(func, args = list(), package = FALSE) - rs_call(self, private, func, args, package), + call = function(func, args = list(), package = FALSE) { + rs_call(self, private, func, args, package) + }, #' @description #' Poll the R session with a timeout. If the session has finished the @@ -232,10 +237,12 @@ r_session <- R6::R6Class( read_buffer = function() rs__read_buffer(self, private), read_message = function() rs__read_message(self, private), - get_result_and_output = function(std = FALSE) - rs__get_result_and_output(self, private, std), - report_back = function(code, text = "") - rs__report_back(self, private, code, text), + get_result_and_output = function(std = FALSE) { + rs__get_result_and_output(self, private, std) + }, + report_back = function(code, text = "") { + rs__report_back(self, private, code, text) + }, write_for_sure = function(text) rs__write_for_sure(self, private, text), parse_msg = function(msg) rs__parse_msg(self, private, msg), attach_wait = function() rs__attach_wait(self, private) @@ -244,8 +251,7 @@ r_session <- R6::R6Class( rs_init <- function(self, private, super, options, wait, wait_timeout) { options$func <- options$func %||% - function() { - } + function() {} options$args <- list() options$load_hook <- session_load_hook(options$load_hook) @@ -283,15 +289,22 @@ rs_init <- function(self, private, super, options, wait, wait_timeout) { private$state <- "starting" if (wait) { + otel::start_span("r_session$initialize() wait", session = otel_session) timeout <- wait_timeout have_until <- Sys.time() + as.difftime(timeout / 1000, units = "secs") pr <- self$poll_io(timeout) out <- "" err <- "" while (any(pr == "ready")) { - if (pr["output"] == "ready") out <- paste0(out, self$read_output()) - if (pr["error"] == "ready") err <- paste0(err, self$read_error()) - if (pr["process"] == "ready") break + if (pr["output"] == "ready") { + out <- paste0(out, self$read_output()) + } + if (pr["error"] == "ready") { + err <- paste0(err, self$read_error()) + } + if (pr["process"] == "ready") { + break + } timeout <- as.double(have_until - Sys.time(), units = "secs") * 1000 pr <- self$poll_io(as.integer(timeout)) } @@ -328,7 +341,9 @@ rs_read <- function(self, private) { out <- private$read_message() } if (!length(out)) { - if (processx::processx_conn_is_incomplete(private$pipe)) return() + if (processx::processx_conn_is_incomplete(private$pipe)) { + return() + } if (self$is_alive()) { # We do this in on.exit(), because parse_msg still reads the streams on.exit(self$kill(), add = TRUE) @@ -386,7 +401,9 @@ rs__read_buffer <- function(self, private) { rs__read_message <- function(self, private) { # A new message, we can surely read the first line out <- processx::processx_conn_read_lines(private$pipe, 1) - if (length(out) == 0) return(NULL) + if (length(out) == 0) { + return(NULL) + } header <- rs__parse_header(out) body <- "" @@ -426,7 +443,9 @@ rs_close <- function(self, private, grace) { self$poll_process(grace) self$kill() self$wait(1000) - if (self$is_alive()) throw(new_error("Could not kill background R session")) + if (self$is_alive()) { + throw(new_error("Could not kill background R session")) + } private$state <- "finished" private$fun_started_at <- as.POSIXct(NA) processx::processx_conn_close(private$pipe) @@ -439,9 +458,15 @@ rs_call <- function(self, private, func, args, package) { ## We only allow a new command if the R session is idle. ## This allows keeping a clean state ## TODO: do we need a state at all? - if (private$state == "starting") throw(new_error("R session not ready yet")) - if (private$state == "finished") throw(new_error("R session finished")) - if (private$state == "busy") throw(new_error("R session busy")) + if (private$state == "starting") { + throw(new_error("R session not ready yet")) + } + if (private$state == "finished") { + throw(new_error("R session finished")) + } + if (private$state == "busy") { + throw(new_error("R session busy")) + } ## Save the function in a file private$options$func <- func @@ -502,7 +527,9 @@ rs_run_with_output <- function(self, private, func, args, package) { { processx::poll(list(private$pipe), -1) msg <- self$read() - if (is.null(msg)) next + if (is.null(msg)) { + next + } if (msg$code == 200 || (msg$code >= 500 && msg$code < 600)) { return(msg) } @@ -581,7 +608,9 @@ rs_debug <- function(self, private) { hasdump <- self$run(function() { !is.null(as.environment("tools:callr")$`__callr_data__`$.Last.dump) }) - if (!hasdump) stop("Can't find dumped frames, nothing to debug") + if (!hasdump) { + stop("Can't find dumped frames, nothing to debug") + } help <- function() { cat( @@ -604,7 +633,9 @@ rs_debug <- function(self, private) { translate_cmd <- function(cmd) { if (cmd == ".where") { traceback(tb) - if (frame) cat("Inspecting frame", frame, "\n") + if (frame) { + cat("Inspecting frame", frame, "\n") + } NULL } else if (cmd == ".help") { help() @@ -639,8 +670,12 @@ rs_debug <- function(self, private) { ) cmd <- rs__attach_get_input(prompt) cmd2 <- translate_cmd(cmd) - if (should_quit) break - if (is.null(cmd2)) next + if (should_quit) { + break + } + if (is.null(cmd2)) { + next + } try(update_history(cmd), silent = TRUE) @@ -666,13 +701,19 @@ rs_debug <- function(self, private) { rs_attach <- function(self, private) { out <- self$get_output_connection() err <- self$get_error_connection() - while (nchar(x <- processx::processx_conn_read_chars(out))) cat(x) - while (nchar(x <- processx::processx_conn_read_chars(err))) cat(bold(x)) + while (nchar(x <- processx::processx_conn_read_chars(out))) { + cat(x) + } + while (nchar(x <- processx::processx_conn_read_chars(err))) { + cat(bold(x)) + } tryCatch( { while (TRUE) { cmd <- rs__attach_get_input(paste0("RS ", self$get_pid(), " > ")) - if (cmd == ".q") break + if (cmd == ".q") { + break + } try(update_history(cmd), silent = TRUE) private$write_for_sure(paste0(cmd, "\n")) private$report_back(202, "done") @@ -726,7 +767,9 @@ rs__report_back <- function(self, private, code, text) { rs__write_for_sure <- function(self, private, text) { while (1) { text <- self$write_input(text) - if (!length(text)) break + if (!length(text)) { + break + } Sys.sleep(.1) } } @@ -813,7 +856,7 @@ rs__status_expr <- function(code, text = "", fd = 3L) { } rs__prehook <- function(stdout, stderr) { - oexpr <- if (!is.null(stdout)) + oexpr <- if (!is.null(stdout)) { substitute( { assign( @@ -826,7 +869,8 @@ rs__prehook <- function(stdout, stderr) { }, list(`__fn__` = stdout) ) - eexpr <- if (!is.null(stderr)) + } + eexpr <- if (!is.null(stderr)) { substitute( { assign( @@ -839,6 +883,7 @@ rs__prehook <- function(stdout, stderr) { }, list(`__fn__` = stderr) ) + } substitute( { @@ -850,18 +895,20 @@ rs__prehook <- function(stdout, stderr) { } rs__posthook <- function(stdout, stderr) { - oexpr <- if (!is.null(stdout)) + oexpr <- if (!is.null(stdout)) { substitute({ as.environment("tools:callr")$`__callr_data__`$pxlib$set_stdout( as.environment("tools:callr")$`__callr_data__`$.__stdout__ ) }) - eexpr <- if (!is.null(stderr)) + } + eexpr <- if (!is.null(stderr)) { substitute({ as.environment("tools:callr")$`__callr_data__`$pxlib$set_stderr( as.environment("tools:callr")$`__callr_data__`$.__stderr__ ) }) + } substitute( { diff --git a/R/result.R b/R/result.R index 86629c2d..dedac755 100644 --- a/R/result.R +++ b/R/result.R @@ -18,7 +18,9 @@ get_result <- function(output, options) { res <- options$result_file ## Timeout? - if (output$timeout) throw(new_callr_crash_error(output)) + if (output$timeout) { + throw(new_callr_crash_error(output)) + } ## No output file and no error file? Some other (system?) error then, ## unless exit status was zero, which is probably just quit(). diff --git a/R/rscript.R b/R/rscript.R index 7fe2713e..983fd966 100644 --- a/R/rscript.R +++ b/R/rscript.R @@ -50,13 +50,17 @@ rscript <- function( } rscript_load_hook_color <- function(color) { - if (!color) return("") + if (!color) { + return("") + } nc <- tryCatch( cli::num_ansi_colors(), error = function(e) 1L ) - if (nc == 1) return("") + if (nc == 1) { + return("") + } expr <- substitute( options(crayon.enabled = TRUE, crayon.colors = `_nc_`), diff --git a/R/script.R b/R/script.R index 0abe3eb9..4c919fd5 100644 --- a/R/script.R +++ b/R/script.R @@ -75,7 +75,9 @@ make_vanilla_script_expr <- function( message <- function() { substitute({ pxlib <- base::as.environment("tools:callr")$`__callr_data__`$pxlib - if (base::is.null(e$code)) e$code <- "301" + if (base::is.null(e$code)) { + e$code <- "301" + } msg <- base::paste0( "base64::", pxlib$base64_encode(base::serialize(e, NULL)) diff --git a/R/setup.R b/R/setup.R index 1e018a98..1920f465 100644 --- a/R/setup.R +++ b/R/setup.R @@ -30,7 +30,9 @@ transport_fun <- function( package, source_refs = getOption("callr.keep.source") ) { - if (!isTRUE(source_refs)) fun <- remove_source(fun) + if (!isTRUE(source_refs)) { + fun <- remove_source(fun) + } if (isTRUE(package)) { # Do nothing @@ -87,15 +89,28 @@ setup_context <- function(options) { env[save_nms[save_set]] <- Sys.getenv(save_env[save_set]) env <- env[setdiff(names(env), save_nms[!keep_set & !save_set])] - if (is.na(env["R_ENVIRON"])) env["R_ENVIRON"] <- envs[[1]] - if (is.na(env["R_ENVIRON_USER"])) env["R_ENVIRON_USER"] <- envs[[2]] - if (is.na(env["R_PROFILE"])) env["R_PROFILE"] <- profiles[[1]] - if (is.na(env["R_PROFILE_USER"])) env["R_PROFILE_USER"] <- profiles[[2]] + if (is.na(env["R_ENVIRON"])) { + env["R_ENVIRON"] <- envs[[1]] + } + if (is.na(env["R_ENVIRON_USER"])) { + env["R_ENVIRON_USER"] <- envs[[2]] + } + if (is.na(env["R_PROFILE"])) { + env["R_PROFILE"] <- profiles[[1]] + } + if (is.na(env["R_PROFILE_USER"])) { + env["R_PROFILE_USER"] <- profiles[[2]] + } - if (is.na(env["R_LIBS"])) env["R_LIBS"] <- make_path(libpath) - if (is.na(env["R_LIBS_USER"])) env["R_LIBS_USER"] <- make_path(libpath) - if (is.na(env["R_LIBS_SITE"])) + if (is.na(env["R_LIBS"])) { + env["R_LIBS"] <- make_path(libpath) + } + if (is.na(env["R_LIBS_USER"])) { + env["R_LIBS_USER"] <- make_path(libpath) + } + if (is.na(env["R_LIBS_SITE"])) { env["R_LIBS_SITE"] <- make_path(.Library.site) + } env["CALLR_IS_RUNNING"] <- "true" }) @@ -132,10 +147,14 @@ make_profiles <- function(system, user, repos, libpath, load_hook, env) { if (file.exists(local)) user <- local else user <- NA_character_ } else if (user) { user <- env["R_PROFILE_USER"] - if (is.na(user)) user <- Sys.getenv("R_PROFILE_USER", NA_character_) + if (is.na(user)) { + user <- Sys.getenv("R_PROFILE_USER", NA_character_) + } local <- ".Rprofile" home <- path.expand("~/.Rprofile") - if (is.na(user) && file.exists(local)) user <- local + if (is.na(user) && file.exists(local)) { + user <- local + } if (is.na(user) && file.exists(home)) user <- home } else { user <- NA_character_ @@ -216,19 +235,29 @@ make_environ <- function(profiles, libpath, env) { } sys <- env["R_ENVIRON"] - if (is.na(sys)) sys <- Sys.getenv("R_ENVIRON", NA_character_) - if (is.na(sys)) sys <- file.path(R.home("etc"), "Renviron.site") + if (is.na(sys)) { + sys <- Sys.getenv("R_ENVIRON", NA_character_) + } + if (is.na(sys)) { + sys <- file.path(R.home("etc"), "Renviron.site") + } if (!is.na(sys) && file.exists(sys)) { file.append(env_sys, sys) cat("\n", file = env_sys, append = TRUE) } user <- env["R_ENVIRON_USER"] - if (is.na(user)) user <- Sys.getenv("R_ENVIRON_USER", NA_character_) + if (is.na(user)) { + user <- Sys.getenv("R_ENVIRON_USER", NA_character_) + } local <- ".Renviron" home <- "~/.Renviron" - if (is.na(user) && file.exists(local)) user <- local - if (is.na(user) && file.exists(home)) user <- home + if (is.na(user) && file.exists(local)) { + user <- local + } + if (is.na(user) && file.exists(home)) { + user <- home + } if (!is.na(user) && file.exists(user)) { file.append(env_user, user) cat("\n", file = env_user, append = TRUE) @@ -292,8 +321,9 @@ setup_callbacks <- function(options) { options <- append( options, list( - "real_block_callback" = if (!is.null(block_cb)) + "real_block_callback" = if (!is.null(block_cb)) { function(x, proc) block_cb(x) + } ) ) @@ -302,11 +332,15 @@ setup_callbacks <- function(options) { force(stream) ## In case there is no output, we create an empty file here - if (!is.null(stream) && stream != "2>&1") cat("", file = stream) + if (!is.null(stream) && stream != "2>&1") { + cat("", file = stream) + } if (!is.null(cb)) { function(x, proc) { - if (!is.null(stream)) cat(x, file = stream, sep = "\n", append = TRUE) + if (!is.null(stream)) { + cat(x, file = stream, sep = "\n", append = TRUE) + } cb(x) } } else { diff --git a/R/standalone-errors.R b/R/standalone-errors.R index 28a6b3a6..96a97ffd 100644 --- a/R/standalone-errors.R +++ b/R/standalone-errors.R @@ -265,16 +265,24 @@ err <- local({ always_trace <- isTRUE(getOption("rlib_error_always_trace")) .hide_from_trace <- 1L # .error_frame <- cond - if (!always_trace) signalCondition(cond) + if (!always_trace) { + signalCondition(cond) + } - if (is.null(cond$`_pid`)) cond$`_pid` <- Sys.getpid() - if (is.null(cond$`_timestamp`)) cond$`_timestamp` <- Sys.time() + if (is.null(cond$`_pid`)) { + cond$`_pid` <- Sys.getpid() + } + if (is.null(cond$`_timestamp`)) { + cond$`_timestamp` <- Sys.time() + } # If we get here that means that the condition was not caught by # an exiting handler. That means that we need to create a trace. # If there is a hand-constructed trace already in the error object, # then we'll just leave it there. - if (is.null(cond$trace)) cond <- add_trace_back(cond, frame = frame) + if (is.null(cond$trace)) { + cond <- add_trace_back(cond, frame = frame) + } # Set up environment to store .Last.error, it will be just before # baseenv(), so it is almost as if it was in baseenv() itself, like @@ -291,11 +299,15 @@ err <- local({ env$.Last.error.trace <- cond$trace # If we always wanted a trace, then we signal the condition here - if (always_trace) signalCondition(cond) + if (always_trace) { + signalCondition(cond) + } # If this is not an error, then we'll just return here. This allows # throwing interrupt conditions for example, with the same UI. - if (!inherits(cond, "error")) return(invisible()) + if (!inherits(cond, "error")) { + return(invisible()) + } .hide_from_trace <- NULL # Top-level handler, this is intended for testing only for now, @@ -549,8 +561,12 @@ err <- local({ for (start in hide_from) { hide_this <- invisible_frames[[funs[start]]] for (i in seq_along(hide_this)) { - if (start + i > length(funs)) break - if (funs[start + i] != hide_this[i]) break + if (start + i > length(funs)) { + break + } + if (funs[start + i] != hide_this[i]) { + break + } visibles[start + i] <- FALSE } } @@ -584,20 +600,35 @@ err <- local({ } get_call_scope <- function(call, ns) { - if (is.na(ns)) return("global") - if (!is.call(call)) return("") + if (is.na(ns)) { + return("global") + } + if (!is.call(call)) { + return("") + } if ( is.call(call[[1]]) && (call[[1]][[1]] == quote(`::`) || call[[1]][[1]] == quote(`:::`)) - ) + ) { + return("") + } + if (ns == "base") { + return("::") + } + if (!ns %in% loadedNamespaces()) { return("") - if (ns == "base") return("::") - if (!ns %in% loadedNamespaces()) return("") + } name <- call_name(call) - if (!ns %in% loadedNamespaces()) return("::") + if (!ns %in% loadedNamespaces()) { + return("::") + } nsenv <- asNamespace(ns)$.__NAMESPACE__. - if (is.null(nsenv)) return("::") - if (is.null(nsenv$exports)) return(":::") + if (is.null(nsenv)) { + return("::") + } + if (is.null(nsenv$exports)) { + return(":::") + } if (exists(name, envir = nsenv$exports, inherits = FALSE)) { "::" } else if (exists(name, envir = asNamespace(ns), inherits = FALSE)) { @@ -799,7 +830,9 @@ err <- local({ conditionMessage(cond$parent) } add_exp <- substr(cli::ansi_strip(msg[1]), 1, 1) != "!" - if (add_exp) msg[1] <- paste0(exp, msg[1]) + if (add_exp) { + msg[1] <- paste0(exp, msg[1]) + } c(format_header_line_cli(cond$parent, prefix = "Caused by error"), msg) } ) @@ -899,14 +932,18 @@ err <- local({ NULL } else { cl <- trimws(format(call)) - if (length(cl) > 1) cl <- paste0(cl[1], " ", cli::symbol$ellipsis) + if (length(cl) > 1) { + cl <- paste0(cl[1], " ", cli::symbol$ellipsis) + } cli::format_inline("{.code {cl}}") } } format_srcref_cli <- function(call, srcref = NULL) { ref <- get_srcref(call, srcref) - if (is.null(ref)) return("") + if (is.null(ref)) { + return("") + } link <- if (ref$file != "") { if (Sys.getenv("R_CLI_HYPERLINK_STYLE") == "iterm") { @@ -952,11 +989,12 @@ err <- local({ srcref <- if ("srcref" %in% names(x) || "procsrcref" %in% names(x)) { vapply( seq_len(nrow(x)), - function(i) + function(i) { format_srcref_cli( x[["call"]][[i]], x$procsrcref[[i]] %||% x$srcref[[i]] - ), + ) + }, character(1) ) } else { @@ -988,7 +1026,9 @@ err <- local({ format_trace_call_cli <- function(call, ns = "") { envir <- tryCatch( { - if (!ns %in% loadedNamespaces()) stop("no") + if (!ns %in% loadedNamespaces()) { + stop("no") + } asNamespace(ns) }, error = function(e) .GlobalEnv @@ -1046,11 +1086,12 @@ err <- local({ srcref <- if ("srcref" %in% names(x) || "procsrfref" %in% names(x)) { vapply( seq_len(nrow(x)), - function(i) + function(i) { format_srcref_plain( x[["call"]][[i]], x$procsrcref[[i]] %||% x$srcref[[i]] - ), + ) + }, character(1) ) } else { @@ -1101,14 +1142,18 @@ err <- local({ NULL } else { cl <- trimws(format(call)) - if (length(cl) > 1) cl <- paste0(cl[1], " ...") + if (length(cl) > 1) { + cl <- paste0(cl[1], " ...") + } paste0("`", cl, "`") } } format_srcref_plain <- function(call, srcref = NULL) { ref <- get_srcref(call, srcref) - if (is.null(ref)) return("") + if (is.null(ref)) { + return("") + } link <- if (ref$file != "") { paste0(basename(ref$file), ":", ref$line, ":", ref$col) @@ -1161,10 +1206,16 @@ err <- local({ get_srcref <- function(call, srcref = NULL) { ref <- srcref %||% utils::getSrcref(call) - if (is.null(ref)) return(NULL) - if (inherits(ref, "processed_srcref")) return(ref) + if (is.null(ref)) { + return(NULL) + } + if (inherits(ref, "processed_srcref")) { + return(ref) + } file <- utils::getSrcFilename(ref, full.names = TRUE)[1] - if (is.na(file)) file <- "" + if (is.na(file)) { + file <- "" + } line <- utils::getSrcLocation(ref) %||% "" col <- utils::getSrcLocation(ref, which = "column") %||% "" structure( diff --git a/R/test-helpers.R b/R/test-helpers.R index 37fb88ab..00ef8327 100644 --- a/R/test-helpers.R +++ b/R/test-helpers.R @@ -1,7 +1,9 @@ is_true_check_env_var <- function(x, default = "") { # like utils:::str2logical val <- Sys.getenv(x, default) - if (isTRUE(as.logical(val))) return(TRUE) + if (isTRUE(as.logical(val))) { + return(TRUE) + } tolower(val) %in% c("1", "yes") } @@ -12,13 +14,17 @@ isFALSE <- function(x) { is_false_check_env_var <- function(x, default = "") { # like utils:::str2logical val <- Sys.getenv(x, default) - if (isFALSE(as.logical(val))) return(TRUE) + if (isFALSE(as.logical(val))) { + return(TRUE) + } tolower(val) %in% c("0", "no") } # Only skip if _R_CHECK_FORCE_SUGGESTS_ is false skip_if_not_installed <- function(pkg) { - if (!is_false_check_env_var("_R_CHECK_FORCE_SUGGESTS_")) return() + if (!is_false_check_env_var("_R_CHECK_FORCE_SUGGESTS_")) { + return() + } testthat::skip_if_not_installed(pkg) } diff --git a/R/utils.R b/R/utils.R index f53f9e05..042b057d 100644 --- a/R/utils.R +++ b/R/utils.R @@ -17,7 +17,9 @@ default_repos <- function() { if (!"CRAN" %in% names(opt) || opt[["CRAN"]] == "@CRAN@") { opt[["CRAN"]] <- "https://cloud.r-project.org" } - if (!was_list) opt <- unlist(opt) + if (!was_list) { + opt <- unlist(opt) + } opt } @@ -44,7 +46,9 @@ is.named <- function(x) { } set_envvar <- function(envs) { - if (length(envs) == 0) return() + if (length(envs) == 0) { + return() + } stopifnot(is.named(envs)) @@ -53,8 +57,12 @@ set_envvar <- function(envs) { both_set <- set & !is.na(old) - if (any(set)) do.call("Sys.setenv", as.list(envs[set])) - if (any(!set)) Sys.unsetenv(names(envs)[!set]) + if (any(set)) { + do.call("Sys.setenv", as.list(envs[set])) + } + if (any(!set)) { + Sys.unsetenv(names(envs)[!set]) + } invisible(old) } @@ -114,7 +122,9 @@ read_all <- function(filename) { is_complete_expression <- function(x) { err <- NULL tryCatch(parse(text = x), error = function(e) err <<- e) - if (is.null(err)) return(TRUE) + if (is.null(err)) { + return(TRUE) + } exp <- tryCatch(parse(text = "1+"), error = function(e) e$message) exp1 <- strsplit(exp, "\n")[[1]][[1]] msg <- sub("^.*:\\s*([^:]+)$", "\\1", exp1, perl = TRUE) diff --git a/tests/testthat/helper.R b/tests/testthat/helper.R index abd53f99..7dd6d04c 100644 --- a/tests/testthat/helper.R +++ b/tests/testthat/helper.R @@ -152,7 +152,9 @@ test_package_root <- function() { error = function(e) NULL ) - if (!is.null(x)) return(x) + if (!is.null(x)) { + return(x) + } pkg <- testthat::testing_package() x <- tryCatch( @@ -162,7 +164,9 @@ test_package_root <- function() { error = function(e) NULL ) - if (!is.null(x)) return(x) + if (!is.null(x)) { + return(x) + } stop("Cannot find package root") } diff --git a/tests/testthat/test-archs.R b/tests/testthat/test-archs.R index 67a7c31f..9250e0ef 100644 --- a/tests/testthat/test-archs.R +++ b/tests/testthat/test-archs.R @@ -1,7 +1,9 @@ test_that("r() to the other arch", { skip_on_cran() archs <- supported_archs() - if (length(archs) < 1) return(expect_true(TRUE)) + if (length(archs) < 1) { + return(expect_true(TRUE)) + } ret <- unlist(lapply( archs, function(a) r(function() .Platform$r_arch, arch = a) @@ -12,13 +14,19 @@ test_that("r() to the other arch", { test_that("r_bg() to the other arch", { skip_on_cran() archs <- supported_archs() - if (length(archs) < 1) return(expect_true(TRUE)) + if (length(archs) < 1) { + return(expect_true(TRUE)) + } procs <- lapply(archs, function(a) { r_bg(function() .Platform$r_arch, arch = a) }) on.exit(lapply(procs, function(p) p$kill()), add = TRUE) - for (p in procs) p$wait(3000) - for (p in procs) expect_false(p$is_alive()) + for (p in procs) { + p$wait(3000) + } + for (p in procs) { + expect_false(p$is_alive()) + } res <- unlist(lapply(procs, function(p) p$get_result())) expect_equal(res, archs) }) @@ -26,7 +34,9 @@ test_that("r_bg() to the other arch", { test_that("r_process to the other arch", { skip_on_cran() archs <- supported_archs() - if (length(archs) < 1) return(expect_true(TRUE)) + if (length(archs) < 1) { + return(expect_true(TRUE)) + } procs <- lapply(archs, function(a) { opts <- r_process_options( func = function() .Platform$r_arch, @@ -35,8 +45,12 @@ test_that("r_process to the other arch", { r_process$new(opts) }) on.exit(lapply(procs, function(p) p$kill()), add = TRUE) - for (p in procs) p$wait(3000) - for (p in procs) expect_false(p$is_alive()) + for (p in procs) { + p$wait(3000) + } + for (p in procs) { + expect_false(p$is_alive()) + } res <- unlist(lapply(procs, function(p) p$get_result())) expect_equal(res, archs) }) @@ -44,7 +58,9 @@ test_that("r_process to the other arch", { test_that("r_session to the other arch", { skip_on_cran() archs <- supported_archs() - if (length(archs) < 1) return(expect_true(TRUE)) + if (length(archs) < 1) { + return(expect_true(TRUE)) + } ret <- unlist(lapply(archs, function(a) { opts <- r_session_options(arch = a) rs <- r_session$new(opts) diff --git a/tests/testthat/test-clean-subprocess.R b/tests/testthat/test-clean-subprocess.R index 98fd5b82..e1a044ed 100644 --- a/tests/testthat/test-clean-subprocess.R +++ b/tests/testthat/test-clean-subprocess.R @@ -5,7 +5,9 @@ test_that("r() does not load anything", { clean_envvars(), r(without_env(function() loadedNamespaces())) ) - if (length(pkgs) > 1) print(pkgs) + if (length(pkgs) > 1) { + print(pkgs) + } ## Some R versions still load compiler... expect_true(all(pkgs %in% c("base", "compiler"))) }) @@ -20,7 +22,9 @@ test_that("r_bg() does not load anything", { on.exit(p$kill(), add = TRUE) p$wait(3000) pkgs <- p$get_result() - if (length(pkgs) > 1) print(pkgs) + if (length(pkgs) > 1) { + print(pkgs) + } ## Some R versions still load compiler... expect_true(all(pkgs %in% c("base", "compiler"))) }) @@ -31,7 +35,9 @@ test_that("r_session does not load anything", { rs <- withr::with_envvar(clean_envvars(), r_session$new()) on.exit(rs$close(), add = TRUE) pkgs <- rs$run(without_env(function() loadedNamespaces())) - if (length(pkgs) > 1) print(pkgs) + if (length(pkgs) > 1) { + print(pkgs) + } ## Some R versions still load compiler... expect_true(all(pkgs %in% c("base", "compiler"))) gc() diff --git a/tests/testthat/test-eval.R b/tests/testthat/test-eval.R index 84508712..ba8a585d 100644 --- a/tests/testthat/test-eval.R +++ b/tests/testthat/test-eval.R @@ -269,7 +269,9 @@ test_that("local .Rprofile is not loaded recursively", { expr <- quote({ rprofile <- Sys.getenv("R_PROFILE_USER", "~/.Rprofile") - if (file.exists(rprofile)) source(rprofile) + if (file.exists(rprofile)) { + source(rprofile) + } rm(rprofile) aa <- 123 }) diff --git a/tests/testthat/test-libpath.R b/tests/testthat/test-libpath.R index 3488758d..1bd38977 100644 --- a/tests/testthat/test-libpath.R +++ b/tests/testthat/test-libpath.R @@ -189,7 +189,9 @@ test_that("libpath in system, if subprocess changes R_LIBS", { }) test_that("libpath in system, if subprocess changes R_LIBS #2", { - if (.Platform$OS.type != "unix") skip("Unix only") + if (.Platform$OS.type != "unix") { + skip("Unix only") + } dir.create(tmpkeep <- tempfile("keep")) on.exit(unlink(tmpkeep, recursive = TRUE), add = TRUE) diff --git a/tests/testthat/test-load-client.R b/tests/testthat/test-load-client.R index c6d8d7ea..d1b69647 100644 --- a/tests/testthat/test-load-client.R +++ b/tests/testthat/test-load-client.R @@ -108,7 +108,9 @@ test_that("init function of client lib is run", { test_that("CALLR_NO_TEMP_DLLS", { skip_on_cran() - if (.Platform$OS.type != "windows") skip("Windows only") + if (.Platform$OS.type != "windows") { + skip("Windows only") + } # If not set, then it should come from the temporary location withr::local_envvar(CALLR_NO_TEMP_DLLS = NA_character_) diff --git a/tests/testthat/test-options.R b/tests/testthat/test-options.R index feb05504..866451a3 100644 --- a/tests/testthat/test-options.R +++ b/tests/testthat/test-options.R @@ -1,16 +1,14 @@ test_that("error for unknown options", { expect_snapshot(error = TRUE, { r_process_options( - func = function() { - }, + func = function() {}, foo = "bar" ) }) expect_snapshot(error = TRUE, { r_process_options( - func = function() { - }, + func = function() {}, foo = "bar", bar = "foo" ) diff --git a/tests/testthat/test-r-session.R b/tests/testthat/test-r-session.R index e0603cd4..fef7cb77 100644 --- a/tests/testthat/test-r-session.R +++ b/tests/testthat/test-r-session.R @@ -240,7 +240,9 @@ test_that("exit", { ) deadline <- Sys.time() + 3 - while (rs$is_alive() && Sys.time() < deadline) Sys.sleep(0.05) + while (rs$is_alive() && Sys.time() < deadline) { + Sys.sleep(0.05) + } expect_true(Sys.time() < deadline) expect_false(rs$is_alive()) @@ -281,8 +283,12 @@ test_that("crash", { ## This is a race, and sometimes we don't get the stdout/stderr ## on Windows - if (os_platform() != "windows") expect_equal(res$stdout, "o\n") - if (os_platform() != "windows") expect_equal(substr(res$stderr, 1, 2), "e\n") + if (os_platform() != "windows") { + expect_equal(res$stdout, "o\n") + } + if (os_platform() != "windows") { + expect_equal(substr(res$stderr, 1, 2), "e\n") + } expect_false(rs$is_alive()) expect_equal(rs$get_state(), "finished") rs$close() @@ -320,10 +326,13 @@ test_that("traceback", { test_that("error in the load hook", { opts <- r_session_options(load_hook = quote(stop("oops"))) - expect_snapshot(error = TRUE, local({ - rs <- r_session$new(opts) - on.exit(rs$kill(), add = TRUE) - })) + expect_snapshot( + error = TRUE, + local({ + rs <- r_session$new(opts) + on.exit(rs$kill(), add = TRUE) + }) + ) rs2 <- r_session$new(opts, wait = FALSE) on.exit(rs2$kill(), add = TRUE) From 044ef3b4454a34e35990a77c0ff33e6cf9788a18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Sun, 15 Jun 2025 18:44:56 +0200 Subject: [PATCH 2/9] Basic OpenTelemetry instrumentation --- DESCRIPTION | 5 +++++ R/eval.R | 2 ++ R/r-process.R | 12 ++++++++++++ R/r-session.R | 25 ++++++++++++++++++++++++- R/rcmd.R | 2 ++ R/rscript.R | 2 ++ R/run.R | 5 +++++ 7 files changed, 52 insertions(+), 1 deletion(-) diff --git a/DESCRIPTION b/DESCRIPTION index 5c34752b..696c6749 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -18,17 +18,22 @@ BugReports: https://github.com/r-lib/callr/issues Depends: R (>= 3.4) Imports: + otel, processx (>= 3.6.1), R6, utils Suggests: asciicast (>= 2.3.1), cli (>= 1.1.0), + otelsdk, ps, rprojroot, spelling, testthat (>= 3.2.0), withr (>= 2.3.0) +Remotes: + r-lib/otel, + r-lib/otelsdk Config/Needs/website: r-lib/asciicast, glue, diff --git a/R/eval.R b/R/eval.R index 70fdcad6..ebe866dd 100644 --- a/R/eval.R +++ b/R/eval.R @@ -201,6 +201,8 @@ r <- function( options <- setup_callbacks(options) options <- setup_r_binary_and_args(options) + otel::start_span("callr::r") + out <- run_r(options) get_result(output = out, options) diff --git a/R/r-process.R b/R/r-process.R index 9a82cd64..0d233a88 100644 --- a/R/r-process.R +++ b/R/r-process.R @@ -55,6 +55,13 @@ rp_init <- function(self, private, super, options) { options <- setup_context(options) options <- setup_r_binary_and_args(options) + otel_session <- otel::start_session( + "callr::r_process start", + attributes = otel::as_attributes(options) + ) + otel::log_debug("start r_process", attributes = otel::as_attributes(options)) + options$otel_session <- otel_session + private$options <- options with_envvar( @@ -80,8 +87,13 @@ rp_init <- function(self, private, super, options) { rp_get_result <- function(self, private) { if (self$is_alive()) { + private$options$otel_session$add_event( + "get_result", + attributes = list(done = FALSE) + ) throw(new_error("Still alive")) } + on.exit(private$options$otel_session$end(status_code = "auto"), add = TRUE) ## This is artificial... out <- list( diff --git a/R/r-session.R b/R/r-session.R index 26419e26..854d66fc 100644 --- a/R/r-session.R +++ b/R/r-session.R @@ -259,6 +259,13 @@ rs_init <- function(self, private, super, options, wait, wait_timeout) { options <- setup_context(options) options <- setup_r_binary_and_args(options, script_file = FALSE) + otel_session <- otel::start_session( + "callr::r_session start", + attributes = otel::as_attributes(options) + ) + otel::log_debug("start r_session", attributes = otel::as_attributes(options)) + options$otel_session <- otel_session + private$options <- options prepare_client_files() @@ -333,6 +340,10 @@ rs_init <- function(self, private, super, options, wait, wait_timeout) { } rs_read <- function(self, private) { + spn <- otel::start_span( + "r_session$read", + session = private$options$otel_session + ) if (!is.null(private$buffer)) { # There is a partial message in the buffer, try to finish it. out <- private$read_buffer() @@ -342,6 +353,7 @@ rs_read <- function(self, private) { } if (!length(out)) { if (processx::processx_conn_is_incomplete(private$pipe)) { + spn$set_attribute("message", FALSE) return() } if (self$is_alive()) { @@ -372,7 +384,15 @@ rs_read <- function(self, private) { ) } } - if (length(out)) private$parse_msg(out) + if (length(out)) { + spn$set_attribute("message", TRUE) + if (!is.null(out$header$code)) { + spn$set_attribute("status_code", out$header$code) + } + private$parse_msg(out) + } else { + spn$set_attribute("message", FALSE) + } } rs__read_buffer <- function(self, private) { @@ -439,6 +459,7 @@ rs__parse_header <- function(line) { } rs_close <- function(self, private, grace) { + otel::start_span("r_session$close", session = private$options$otel_session) processx::processx_conn_close(self$get_input_connection()) self$poll_process(grace) self$kill() @@ -451,10 +472,12 @@ rs_close <- function(self, private, grace) { processx::processx_conn_close(private$pipe) processx::processx_conn_close(self$get_output_connection()) processx::processx_conn_close(self$get_error_connection()) + private$options$otel_session$end() invisible() } rs_call <- function(self, private, func, args, package) { + otel::start_span("r_session$call", session = private$options$otel_session) ## We only allow a new command if the R session is idle. ## This allows keeping a clean state ## TODO: do we need a state at all? diff --git a/R/rcmd.R b/R/rcmd.R index c908cec7..70a42dd6 100644 --- a/R/rcmd.R +++ b/R/rcmd.R @@ -72,6 +72,8 @@ rcmd <- function( ## This cleans up everything... on.exit(unlink(options$tmp_files, recursive = TRUE), add = TRUE) + otel::start_span("callr::rcmd", attributes = otel::as_attributes(options)) + run_r(options) } diff --git a/R/rscript.R b/R/rscript.R index 983fd966..065ef4fd 100644 --- a/R/rscript.R +++ b/R/rscript.R @@ -46,6 +46,8 @@ rscript <- function( ## This cleans up everything... on.exit(unlink(options$tmp_files, recursive = TRUE), add = TRUE) + otel::start_span("callr::rscript", attributes = otel::as_attributes(options)) + invisible(run_r(options)) } diff --git a/R/run.R b/R/run.R index 72ba7937..71547277 100644 --- a/R/run.R +++ b/R/run.R @@ -12,6 +12,11 @@ run_r <- function(options) { (!is.null(stdout) && !is.null(stderr) && stdout == stderr) ) + otel::log_debug( + "start callr subprocess", + attributes = otel::as_attributes(options) + ) + res <- with( options, with_envvar( From 68647e80be2e9f19cced12ef9b23bf2613f26c0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Sun, 15 Jun 2025 23:11:15 +0200 Subject: [PATCH 3/9] Support passing OpenTelemetry context to subprocess --- R/eval.R | 7 ++++++- R/hook.R | 28 ++++++++++++++++++++++++++++ R/r-process.R | 7 ++++++- R/r-session.R | 7 ++++++- R/rcmd.R | 7 ++++++- R/rscript.R | 10 +++++++++- R/run.R | 5 +---- 7 files changed, 62 insertions(+), 9 deletions(-) diff --git a/R/eval.R b/R/eval.R index ebe866dd..36916e7c 100644 --- a/R/eval.R +++ b/R/eval.R @@ -201,7 +201,12 @@ r <- function( options <- setup_callbacks(options) options <- setup_r_binary_and_args(options) - otel::start_span("callr::r") + if (otel::is_tracing()) { + otel::start_span("callr::r", attributes = otel::as_attributes(options)) + hdrs <- otel::pack_http_context() + names(hdrs) <- toupper(names(hdrs)) + options$env[names(hdrs)] <- hdrs + } out <- run_r(options) diff --git a/R/hook.R b/R/hook.R index c5241b1b..b35478fc 100644 --- a/R/hook.R +++ b/R/hook.R @@ -6,6 +6,34 @@ common_hook <- function() { detach("tools:callr") } env <- readRDS(`__envfile__`) + + # OpenTelemetry setup + if ( + nzchar(Sys.getenv("TRACEPARENT")) && + requireNamespace("otel", quietly = TRUE) + ) { + hdrs <- as.list(c( + traceparent = Sys.getenv("TRACEPARENT"), + tracestate = Sys.getenv("TRACESTATE"), + baggage = Sys.getenv("BAGGAGE") + )) + prtctx <- otel::extract_http_context(hdrs) + reg.finalizer( + env$`__callr_data__`, + function(e) e$otel_span$end(), + onexit = TRUE + ) + assign( + envir = env$`__callr_data__`, + "otel_span", + otel::start_span( + "callr subprocess", + options = list(parent = prtctx), + scope = NULL + ) + ) + } + do.call("attach", list(env, pos = length(search()), name = "tools:callr")) data <- env$`__callr_data__` data$pxlib <- data$load_client_lib( diff --git a/R/r-process.R b/R/r-process.R index 0d233a88..4bbb01c1 100644 --- a/R/r-process.R +++ b/R/r-process.R @@ -55,11 +55,16 @@ rp_init <- function(self, private, super, options) { options <- setup_context(options) options <- setup_r_binary_and_args(options) + if (otel::is_tracing()) { + hdrs <- otel::pack_http_context() + names(hdrs) <- toupper(names(hdrs)) + options$env[names(hdrs)] <- hdrs + } otel_session <- otel::start_session( "callr::r_process start", attributes = otel::as_attributes(options) ) - otel::log_debug("start r_process", attributes = otel::as_attributes(options)) + otel::log_debug("start r_process", session = otel_session) options$otel_session <- otel_session private$options <- options diff --git a/R/r-session.R b/R/r-session.R index 854d66fc..e57c645a 100644 --- a/R/r-session.R +++ b/R/r-session.R @@ -263,7 +263,12 @@ rs_init <- function(self, private, super, options, wait, wait_timeout) { "callr::r_session start", attributes = otel::as_attributes(options) ) - otel::log_debug("start r_session", attributes = otel::as_attributes(options)) + if (otel::is_tracing()) { + hdrs <- otel::pack_http_context() + names(hdrs) <- toupper(names(hdrs)) + options$env[names(hdrs)] <- hdrs + } + otel::log_debug("start r_session", session = otel_session) options$otel_session <- otel_session private$options <- options diff --git a/R/rcmd.R b/R/rcmd.R index 70a42dd6..d43c4379 100644 --- a/R/rcmd.R +++ b/R/rcmd.R @@ -72,7 +72,12 @@ rcmd <- function( ## This cleans up everything... on.exit(unlink(options$tmp_files, recursive = TRUE), add = TRUE) - otel::start_span("callr::rcmd", attributes = otel::as_attributes(options)) + if (otel::is_tracing()) { + otel::start_span("callr::rcmd", attributes = otel::as_attributes(options)) + hdrs <- otel::pack_http_context() + names(hdrs) <- toupper(names(hdrs)) + options$env[names(hdrs)] <- hdrs + } run_r(options) } diff --git a/R/rscript.R b/R/rscript.R index 065ef4fd..a4ca6116 100644 --- a/R/rscript.R +++ b/R/rscript.R @@ -46,7 +46,15 @@ rscript <- function( ## This cleans up everything... on.exit(unlink(options$tmp_files, recursive = TRUE), add = TRUE) - otel::start_span("callr::rscript", attributes = otel::as_attributes(options)) + if (otel::is_tracing()) { + otel::start_span( + "callr::rscript", + attributes = otel::as_attributes(options) + ) + hdrs <- otel::pack_http_context() + names(hdrs) <- toupper(names(hdrs)) + options$env[names(hdrs)] <- hdrs + } invisible(run_r(options)) } diff --git a/R/run.R b/R/run.R index 71547277..de13ddee 100644 --- a/R/run.R +++ b/R/run.R @@ -12,10 +12,7 @@ run_r <- function(options) { (!is.null(stdout) && !is.null(stderr) && stdout == stderr) ) - otel::log_debug( - "start callr subprocess", - attributes = otel::as_attributes(options) - ) + otel::log_debug("start callr subprocess") res <- with( options, From d43d194ab94ce7e501b302f67ef1b83d071a34da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Tue, 17 Jun 2025 14:56:41 +0200 Subject: [PATCH 4/9] End otel session from the finalizer In case we end up killing the process. --- R/r-session.R | 2 ++ 1 file changed, 2 insertions(+) diff --git a/R/r-session.R b/R/r-session.R index e57c645a..afeb9edf 100644 --- a/R/r-session.R +++ b/R/r-session.R @@ -215,6 +215,8 @@ r_session <- R6::R6Class( private = list( finalize = function() { + private$options$otel_session$add_event("finalizer") + private$options$otel_session$end() unlink(private$tmp_output_file) unlink(private$tmp_error_file) unlink(private$options$tmp_files, recursive = TRUE) From 4d65b81cba884766c2aa78ab8a8a0a8f48978ffc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Tue, 17 Jun 2025 15:04:18 +0200 Subject: [PATCH 5/9] Otel improvements - better span names - better log messages - start session span before packing context, for `r_process` --- R/r-process.R | 10 +++++----- R/r-session.R | 4 ++-- R/run.R | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/R/r-process.R b/R/r-process.R index 4bbb01c1..49d1de82 100644 --- a/R/r-process.R +++ b/R/r-process.R @@ -55,16 +55,16 @@ rp_init <- function(self, private, super, options) { options <- setup_context(options) options <- setup_r_binary_and_args(options) + otel_session <- otel::start_session( + "callr::r_process", + attributes = otel::as_attributes(options) + ) + otel::log_debug("start r_process", session = otel_session) if (otel::is_tracing()) { hdrs <- otel::pack_http_context() names(hdrs) <- toupper(names(hdrs)) options$env[names(hdrs)] <- hdrs } - otel_session <- otel::start_session( - "callr::r_process start", - attributes = otel::as_attributes(options) - ) - otel::log_debug("start r_process", session = otel_session) options$otel_session <- otel_session private$options <- options diff --git a/R/r-session.R b/R/r-session.R index afeb9edf..87626f9d 100644 --- a/R/r-session.R +++ b/R/r-session.R @@ -262,7 +262,7 @@ rs_init <- function(self, private, super, options, wait, wait_timeout) { options <- setup_r_binary_and_args(options, script_file = FALSE) otel_session <- otel::start_session( - "callr::r_session start", + "callr::r_session", attributes = otel::as_attributes(options) ) if (otel::is_tracing()) { @@ -270,7 +270,7 @@ rs_init <- function(self, private, super, options, wait, wait_timeout) { names(hdrs) <- toupper(names(hdrs)) options$env[names(hdrs)] <- hdrs } - otel::log_debug("start r_session", session = otel_session) + otel::log_debug("callr::r_session start", session = otel_session) options$otel_session <- otel_session private$options <- options diff --git a/R/run.R b/R/run.R index de13ddee..49e62209 100644 --- a/R/run.R +++ b/R/run.R @@ -12,7 +12,7 @@ run_r <- function(options) { (!is.null(stdout) && !is.null(stderr) && stdout == stderr) ) - otel::log_debug("start callr subprocess") + otel::log_debug("callr start subprocess") res <- with( options, From 793402c13505072057bb4c13e45bf2cea256919f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Wed, 18 Jun 2025 11:46:40 +0200 Subject: [PATCH 6/9] Update for otel API changes Cannot set the scope to NULL now. --- R/hook.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/hook.R b/R/hook.R index b35478fc..3c88c3e1 100644 --- a/R/hook.R +++ b/R/hook.R @@ -29,7 +29,7 @@ common_hook <- function() { otel::start_span( "callr subprocess", options = list(parent = prtctx), - scope = NULL + scope = .GlobalEnv ) ) } From 4bae8d2f4d0814050eb9ccaa5b13de2efc6e4424 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Wed, 18 Jun 2025 13:16:43 +0200 Subject: [PATCH 7/9] Update for otel API changes --- R/r-process.R | 2 +- R/r-session.R | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/R/r-process.R b/R/r-process.R index 49d1de82..31ed421c 100644 --- a/R/r-process.R +++ b/R/r-process.R @@ -59,7 +59,7 @@ rp_init <- function(self, private, super, options) { "callr::r_process", attributes = otel::as_attributes(options) ) - otel::log_debug("start r_process", session = otel_session) + otel::log_debug("start r_process") if (otel::is_tracing()) { hdrs <- otel::pack_http_context() names(hdrs) <- toupper(names(hdrs)) diff --git a/R/r-session.R b/R/r-session.R index 87626f9d..e225f98a 100644 --- a/R/r-session.R +++ b/R/r-session.R @@ -270,7 +270,7 @@ rs_init <- function(self, private, super, options, wait, wait_timeout) { names(hdrs) <- toupper(names(hdrs)) options$env[names(hdrs)] <- hdrs } - otel::log_debug("callr::r_session start", session = otel_session) + otel::log_debug("callr::r_session start") options$otel_session <- otel_session private$options <- options @@ -303,7 +303,7 @@ rs_init <- function(self, private, super, options, wait, wait_timeout) { private$state <- "starting" if (wait) { - otel::start_span("r_session$initialize() wait", session = otel_session) + otel::start_span("r_session$initialize() wait") timeout <- wait_timeout have_until <- Sys.time() + as.difftime(timeout / 1000, units = "secs") pr <- self$poll_io(timeout) @@ -347,10 +347,8 @@ rs_init <- function(self, private, super, options, wait, wait_timeout) { } rs_read <- function(self, private) { - spn <- otel::start_span( - "r_session$read", - session = private$options$otel_session - ) + otel::local_session(private$options$otel_session) + spn <- otel::start_span("r_session$read") if (!is.null(private$buffer)) { # There is a partial message in the buffer, try to finish it. out <- private$read_buffer() @@ -466,7 +464,8 @@ rs__parse_header <- function(line) { } rs_close <- function(self, private, grace) { - otel::start_span("r_session$close", session = private$options$otel_session) + otel::local_session(private$options$otel_session) + otel::start_span("r_session$close") processx::processx_conn_close(self$get_input_connection()) self$poll_process(grace) self$kill() @@ -484,7 +483,8 @@ rs_close <- function(self, private, grace) { } rs_call <- function(self, private, func, args, package) { - otel::start_span("r_session$call", session = private$options$otel_session) + otel::local_session(private$options$otel_session) + otel::start_span("r_session$call") ## We only allow a new command if the R session is idle. ## This allows keeping a clean state ## TODO: do we need a state at all? From 0644346e8e777a3312612c61112c81c9a005520f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Wed, 18 Jun 2025 14:42:16 +0200 Subject: [PATCH 8/9] Add some otel events --- R/hook.R | 10 +++++----- R/script.R | 12 +++++++++++- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/R/hook.R b/R/hook.R index 3c88c3e1..780d7e01 100644 --- a/R/hook.R +++ b/R/hook.R @@ -8,10 +8,10 @@ common_hook <- function() { env <- readRDS(`__envfile__`) # OpenTelemetry setup - if ( - nzchar(Sys.getenv("TRACEPARENT")) && - requireNamespace("otel", quietly = TRUE) - ) { + has_otel <- nzchar(Sys.getenv("TRACEPARENT")) && + requireNamespace("otel", quietly = TRUE) + assign(envir = env$`__callr_data__`, "has_otel", has_otel) + if (has_otel) { hdrs <- as.list(c( traceparent = Sys.getenv("TRACEPARENT"), tracestate = Sys.getenv("TRACESTATE"), @@ -41,7 +41,7 @@ common_hook <- function() { data$pxdir ) options(error = function() invokeRestart("abort")) - rm(list = c("data", "env")) + rm(list = c("data", "env", "has_otel")) lapply( c( diff --git a/R/script.R b/R/script.R index 4c919fd5..95e8aa16 100644 --- a/R/script.R +++ b/R/script.R @@ -43,6 +43,9 @@ make_vanilla_script_expr <- function( e2$trace <- e2$trace[-(1:cut), ] } + if (callr_data$has_otel) { + callr_data$otel_span$record_exception(e2) + } base::saveRDS( base::list("error", e2, e), file = base::paste0(`__res__`, ".error") @@ -74,7 +77,8 @@ make_vanilla_script_expr <- function( if (messages) { message <- function() { substitute({ - pxlib <- base::as.environment("tools:callr")$`__callr_data__`$pxlib + callr_data <- base::as.environment("tools:callr")$`__callr_data__` + pxlib <- callr_data$pxlib if (base::is.null(e$code)) { e$code <- "301" } @@ -83,6 +87,12 @@ make_vanilla_script_expr <- function( pxlib$base64_encode(base::serialize(e, NULL)) ) data <- base::paste0(e$code, " ", base::nchar(msg), "\n", msg) + if (callr_data$has_otel) { + callr$data$otel_span$add_event( + "callr message", + attributes = list(status_code = e$code) + ) + } pxlib$write_fd(3L, data) if ( From 8cdc781d27d3e991a14d223c0551511b717e5842 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Thu, 19 Jun 2025 13:52:16 +0200 Subject: [PATCH 9/9] Set otel_tracer_name --- R/callr-package.R | 2 ++ 1 file changed, 2 insertions(+) diff --git a/R/callr-package.R b/R/callr-package.R index dbd2579d..93ffca7b 100644 --- a/R/callr-package.R +++ b/R/callr-package.R @@ -13,6 +13,8 @@ #' @keywords internal "_PACKAGE" +otel_tracer_name <- "org.r-lib.callr" + ## usethis namespace: start ## usethis namespace: end NULL