diff --git a/tests/testthat/helper-sync.R b/tests/testthat/helper-sync.R index da6a3ab6..ab2428c7 100644 --- a/tests/testthat/helper-sync.R +++ b/tests/testthat/helper-sync.R @@ -14,19 +14,13 @@ sync_req <- function(name, .env = parent.frame()) { nanonext::pipe_notify(sock, cv, add = TRUE) nanonext::listen(sock, url = sprintf("ipc:///tmp/nanonext%s", name)) - function( - expr = {}, - timeout = 1000L - ) { + function(resp, timeout = 1000L) { if (!connected) { nanonext::until(cv, timeout) connected <<- TRUE } - ctx <- nanonext::context(sock) - saio <- nanonext::send_aio(ctx, 0L, mode = 2L) - expr - nanonext::call_aio(nanonext::recv_aio(ctx, mode = 8L, timeout = timeout)) - nanonext::msleep(50L) # wait, as nanonext messages can return faster than side effects (e.g. stream) + nanonext::send(sock, 0L, mode = 2L, block = timeout) + wait_for_http_data(resp, timeout / 1000) } } @@ -44,17 +38,40 @@ sync_rep <- function(name, .env = parent.frame()) { nanonext::pipe_notify(sock, cv, add = TRUE) nanonext::dial(sock, url = sprintf("ipc:///tmp/nanonext%s", name)) - function( - expr = {}, - timeout = 1000L - ) { + function(expr = {}, timeout = 1000L) { if (!connected) { nanonext::until(cv, timeout) connected <<- TRUE } - ctx <- nanonext::context(sock) - nanonext::call_aio(nanonext::recv_aio(ctx, mode = 8L, timeout = timeout)) + nanonext::recv(sock, mode = 8L, block = timeout) expr - nanonext::send(ctx, 0L, mode = 2L, block = TRUE) } } + +wait_for_http_data <- function(resp, timeout_s) { + if (resp$body$is_complete()) { + return(invisible(TRUE)) + } + + deadline <- as.double(Sys.time()) + timeout_s + poll_interval <- 0.01 + + while (as.double(Sys.time()) < deadline) { + chunk <- resp$body$read(256) + if (length(chunk) > 0) { + resp$cache$push_back <- c(resp$cache$push_back, chunk) + return(invisible(TRUE)) + } + + if (resp$body$is_complete()) { + return(invisible(TRUE)) + } + + remaining <- deadline - as.double(Sys.time()) + if (remaining > 0) { + Sys.sleep(poll_interval) + } + } + + invisible(FALSE) +} diff --git a/tests/testthat/test-resp-stream.R b/tests/testthat/test-resp-stream.R index 408df5aa..bc66fa45 100644 --- a/tests/testthat/test-resp-stream.R +++ b/tests/testthat/test-resp-stream.R @@ -83,7 +83,7 @@ test_that("can join lines across multiple reads", { out <- resp_stream_lines(resp1) expect_equal(out, character()) - sync() + sync(resp1) out <- resp_stream_lines(resp1) expect_equal(out, "This is a complete sentence.") }) @@ -121,7 +121,7 @@ test_that("handles line endings of multiple kinds", { for (expected in expected_values) { rlang::inject(expect_equal(resp_stream_lines(resp1), !!expected)) - sync() + sync(resp1) } expect_warning(out <- resp_stream_lines(resp1), "incomplete final line") expect_equal(out, "eof without line ending") @@ -233,11 +233,11 @@ test_that("can join sse events across multiple reads", { expect_equal(out, NULL) expect_equal(resp1$cache$push_back, charToRaw("data: 1\n")) - sync() + sync(resp1) out <- resp_stream_sse(resp1) expect_equal(out, NULL) - sync() + sync(resp1) out <- resp_stream_sse(resp1) expect_equal(out, list(type = "message", data = "1\n2", id = "")) expect_equal(resp1$cache$push_back, charToRaw("data: 3\n\n"))