From 3e25ae6ed0154db50c2c783590db382b6939f5bb Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Mon, 22 Dec 2025 20:34:39 +0000 Subject: [PATCH 1/3] Improve synchronization in streaming tests --- tests/testthat/helper-sync.R | 66 +++++++++++++++++++++++-------- tests/testthat/test-resp-stream.R | 8 ++-- 2 files changed, 54 insertions(+), 20 deletions(-) diff --git a/tests/testthat/helper-sync.R b/tests/testthat/helper-sync.R index da6a3ab6..2cc7bf69 100644 --- a/tests/testthat/helper-sync.R +++ b/tests/testthat/helper-sync.R @@ -1,6 +1,7 @@ sync_req <- function(name, .env = parent.frame()) { skip_on_cran() skip_if_not_installed("nanonext") + skip_if_not_installed("later") if (missing(name) || !is.character(name)) { cli::cli_abort( @@ -14,19 +15,13 @@ sync_req <- function(name, .env = parent.frame()) { nanonext::pipe_notify(sock, cv, add = TRUE) nanonext::listen(sock, url = sprintf("ipc:///tmp/nanonext%s", name)) - function( - expr = {}, - timeout = 1000L - ) { + function(resp, timeout = 1000L) { if (!connected) { nanonext::until(cv, timeout) connected <<- TRUE } - ctx <- nanonext::context(sock) - saio <- nanonext::send_aio(ctx, 0L, mode = 2L) - expr - nanonext::call_aio(nanonext::recv_aio(ctx, mode = 8L, timeout = timeout)) - nanonext::msleep(50L) # wait, as nanonext messages can return faster than side effects (e.g. stream) + nanonext::send(sock, 0L, mode = 2L, block = timeout) + wait_for_http_data(resp, timeout / 1000) } } @@ -44,17 +39,56 @@ sync_rep <- function(name, .env = parent.frame()) { nanonext::pipe_notify(sock, cv, add = TRUE) nanonext::dial(sock, url = sprintf("ipc:///tmp/nanonext%s", name)) - function( - expr = {}, - timeout = 1000L - ) { + function(expr = {}, timeout = 1000L) { if (!connected) { nanonext::until(cv, timeout) connected <<- TRUE } - ctx <- nanonext::context(sock) - nanonext::call_aio(nanonext::recv_aio(ctx, mode = 8L, timeout = timeout)) + nanonext::recv(sock, mode = 8L, block = timeout) expr - nanonext::send(ctx, 0L, mode = 2L, block = TRUE) } } + +wait_for_http_data <- function(resp, timeout_s) { + if (resp$body$is_complete()) { + return(invisible(TRUE)) + } + + deadline <- as.double(Sys.time()) + timeout_s + + while ((remaining <- deadline - as.double(Sys.time())) > 0) { + fdset <- resp$body$get_fdset() + if (length(fdset$reads) == 0) { + return(invisible(FALSE)) + } + + fd_ready <- FALSE + later::later_fd( + func = function(ready) { + fd_ready <<- any(ready, na.rm = TRUE) + }, + readfds = fdset$reads, + timeout = remaining + ) + later::run_now(remaining) + + if (!fd_ready) { + break + } # Timeout + + # Try to actually read data from FD + chunk <- resp$body$read(256) + + if (length(chunk) > 0) { + # Append new data to push_back so tests can read it + resp$cache$push_back <- c(resp$cache$push_back, chunk) + return(invisible(TRUE)) + } + + if (resp$body$is_complete()) { + return(invisible(TRUE)) + } + } + + invisible(FALSE) +} diff --git a/tests/testthat/test-resp-stream.R b/tests/testthat/test-resp-stream.R index 408df5aa..bc66fa45 100644 --- a/tests/testthat/test-resp-stream.R +++ b/tests/testthat/test-resp-stream.R @@ -83,7 +83,7 @@ test_that("can join lines across multiple reads", { out <- resp_stream_lines(resp1) expect_equal(out, character()) - sync() + sync(resp1) out <- resp_stream_lines(resp1) expect_equal(out, "This is a complete sentence.") }) @@ -121,7 +121,7 @@ test_that("handles line endings of multiple kinds", { for (expected in expected_values) { rlang::inject(expect_equal(resp_stream_lines(resp1), !!expected)) - sync() + sync(resp1) } expect_warning(out <- resp_stream_lines(resp1), "incomplete final line") expect_equal(out, "eof without line ending") @@ -233,11 +233,11 @@ test_that("can join sse events across multiple reads", { expect_equal(out, NULL) expect_equal(resp1$cache$push_back, charToRaw("data: 1\n")) - sync() + sync(resp1) out <- resp_stream_sse(resp1) expect_equal(out, NULL) - sync() + sync(resp1) out <- resp_stream_sse(resp1) expect_equal(out, list(type = "message", data = "1\n2", id = "")) expect_equal(resp1$cache$push_back, charToRaw("data: 3\n\n")) From 396a68cf9ef33538ae2614771cd1d34cc0d7005b Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Wed, 7 Jan 2026 15:01:30 +0000 Subject: [PATCH 2/3] Add read retry loop --- tests/testthat/helper-sync.R | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/tests/testthat/helper-sync.R b/tests/testthat/helper-sync.R index 2cc7bf69..344aa968 100644 --- a/tests/testthat/helper-sync.R +++ b/tests/testthat/helper-sync.R @@ -59,6 +59,9 @@ wait_for_http_data <- function(resp, timeout_s) { while ((remaining <- deadline - as.double(Sys.time())) > 0) { fdset <- resp$body$get_fdset() if (length(fdset$reads) == 0) { + if (resp$body$is_complete()) { + return(invisible(TRUE)) + } return(invisible(FALSE)) } @@ -74,19 +77,21 @@ wait_for_http_data <- function(resp, timeout_s) { if (!fd_ready) { break - } # Timeout - - # Try to actually read data from FD - chunk <- resp$body$read(256) - - if (length(chunk) > 0) { - # Append new data to push_back so tests can read it - resp$cache$push_back <- c(resp$cache$push_back, chunk) - return(invisible(TRUE)) } - if (resp$body$is_complete()) { - return(invisible(TRUE)) + # FD is ready - try reading with retries to allow curl to process the data + # later_fd fires when socket is readable, but curl may need time to process + retry_deadline <- as.double(Sys.time()) + 0.1 + while (as.double(Sys.time()) < retry_deadline) { + chunk <- resp$body$read(256) + if (length(chunk) > 0) { + resp$cache$push_back <- c(resp$cache$push_back, chunk) + return(invisible(TRUE)) + } + if (resp$body$is_complete()) { + return(invisible(TRUE)) + } + Sys.sleep(0.01) } } From 9dbf811802104f891d7fe0afa8b674df1fcf2503 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Wed, 7 Jan 2026 15:36:28 +0000 Subject: [PATCH 3/3] Simplify approach --- tests/testthat/helper-sync.R | 44 +++++++++--------------------------- 1 file changed, 11 insertions(+), 33 deletions(-) diff --git a/tests/testthat/helper-sync.R b/tests/testthat/helper-sync.R index 344aa968..ab2428c7 100644 --- a/tests/testthat/helper-sync.R +++ b/tests/testthat/helper-sync.R @@ -1,7 +1,6 @@ sync_req <- function(name, .env = parent.frame()) { skip_on_cran() skip_if_not_installed("nanonext") - skip_if_not_installed("later") if (missing(name) || !is.character(name)) { cli::cli_abort( @@ -55,43 +54,22 @@ wait_for_http_data <- function(resp, timeout_s) { } deadline <- as.double(Sys.time()) + timeout_s + poll_interval <- 0.01 - while ((remaining <- deadline - as.double(Sys.time())) > 0) { - fdset <- resp$body$get_fdset() - if (length(fdset$reads) == 0) { - if (resp$body$is_complete()) { - return(invisible(TRUE)) - } - return(invisible(FALSE)) + while (as.double(Sys.time()) < deadline) { + chunk <- resp$body$read(256) + if (length(chunk) > 0) { + resp$cache$push_back <- c(resp$cache$push_back, chunk) + return(invisible(TRUE)) } - fd_ready <- FALSE - later::later_fd( - func = function(ready) { - fd_ready <<- any(ready, na.rm = TRUE) - }, - readfds = fdset$reads, - timeout = remaining - ) - later::run_now(remaining) - - if (!fd_ready) { - break + if (resp$body$is_complete()) { + return(invisible(TRUE)) } - # FD is ready - try reading with retries to allow curl to process the data - # later_fd fires when socket is readable, but curl may need time to process - retry_deadline <- as.double(Sys.time()) + 0.1 - while (as.double(Sys.time()) < retry_deadline) { - chunk <- resp$body$read(256) - if (length(chunk) > 0) { - resp$cache$push_back <- c(resp$cache$push_back, chunk) - return(invisible(TRUE)) - } - if (resp$body$is_complete()) { - return(invisible(TRUE)) - } - Sys.sleep(0.01) + remaining <- deadline - as.double(Sys.time()) + if (remaining > 0) { + Sys.sleep(poll_interval) } }