From fe4226212d319be1cfa8c00a33723d67eefc1d5b Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Mon, 12 Jan 2026 13:08:38 +0000 Subject: [PATCH] Fix unkillable test jobs while waiting for dependencies When a test job was waiting for its dependencies to complete (in `await_dep_success`), it did not monitor its cancellation token. This meant that if the process received a SIGINT (Ctrl+C), the waiting job would continue to block until all its dependencies finished (or timed out after their own grace period). This commit modifies `await_dep_success` to use `tokio::select!` and race the dependency completion against the cancellation token. If the token is cancelled, the wait is aborted immediately, returning `TestInconclusive::Canceled`. This ensures that the `limmat` process can shut down more responsively, even if dependency jobs are taking time to clean up. Wait logic in `await_dep_success` was refactored to return `Result` to differentiate between dependency failure and cancellation. Added a regression test `should_cancel_waiting_for_deps` which simulates a slow-to-cancel dependency and asserts that the waiting job cancels immediately. Fixes #36 --- src/test.rs | 107 ++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 99 insertions(+), 8 deletions(-) diff --git a/src/test.rs b/src/test.rs index 4757638..14a3e75 100644 --- a/src/test.rs +++ b/src/test.rs @@ -602,6 +602,11 @@ pub struct TestJob { pub type DepDatabaseEntries = HashMap>; +enum DepWaitError { + DependencyFailed(TestName), + Canceled, +} + impl<'a> TestJob { pub fn subscribe_completion(&self) -> broadcast::Receiver { self.notifier.subscribe_completion() @@ -635,10 +640,13 @@ impl<'a> TestJob { ) -> TestOutcome { // Wait for dependencies do be done, bail early if they do anything // but terminate successfully. - let dep_db_entries = self - .await_dep_success() - .await - .map_err(|test_name| anyhow!("dependency job {test_name} failed"))?; + let dep_db_entries = match self.await_dep_success().await { + Ok(e) => e, + Err(DepWaitError::DependencyFailed(test_name)) => { + return Err(anyhow!("dependency job {test_name} failed").into()) + } + Err(DepWaitError::Canceled) => return Err(TestInconclusive::Canceled), + }; // Throttle to avoid opening zillions of database entries (probably // generally to avoid other resource exhaustions too). @@ -695,7 +703,7 @@ impl<'a> TestJob { // Blocks until all dependency jobs have succeeded and returns all the // database entries containing their results, or returns an error reporting // the name of the job that terminated without success. - async fn await_dep_success(&mut self) -> Result { + async fn await_dep_success(&mut self) -> Result { // This is another thing where the tokio::sync::watch API is a bit // weird, there's no way to wait for a message without passing a // predicate, so we have to pass a dummy one. @@ -707,8 +715,15 @@ impl<'a> TestJob { let mut wait_for: Vec<_> = wait_for.into_iter().map(Box::pin).collect(); let mut ret = HashMap::new(); while !wait_for.is_empty() { - let ((test_name, outcome), _idx, remaining) = select_all(wait_for).await; - wait_for = remaining; + let (test_name, outcome) = select! { + biased; + _ = self.ct.cancelled() => return Err(DepWaitError::Canceled), + ((test_name, outcome), _idx, remaining) = select_all(wait_for) => { + wait_for = remaining; + (test_name, outcome) + } + }; + // We are squashing lots of different types of failures and aborts // (including the "impossible" case that the sender has been dropped // and the rx.wait_for call failed) here, we trust that the other @@ -727,7 +742,7 @@ impl<'a> TestJob { "Dependency {:?} of {:?} failed: {:?}", test_name, self.test_case.test.name, outcome ); - return Err(test_name.clone()); + return Err(DepWaitError::DependencyFailed(test_name.clone())); } debug!("{:?}: Dependencies succeeded", self.test_case); Ok(ret) @@ -1153,6 +1168,7 @@ mod tests { // We would like this to be an OsStr but you can't do that according to // https://stackoverflow.com/questions/49226783/is-there-any-way-to-represent-an-osstr-or-osstring-literal pub const BLOCK_COMMIT_MSG_TAG: &'static str = "block_this_test"; + pub const SLOW_SIGTERM_TAG: &'static str = "slow_sigterm"; // Generate a tag which, when put in the commit message of a commit, will result in the test // returning the given exit code. @@ -1197,6 +1213,9 @@ mod tests { exit_code=$(echo \"$commit_msg\" | perl -n -e'/exit_code\\((\\d+)\\)/ && print $1') on_sigterm() {{ touch {sigtermed_path_prefix:?}$(git rev-parse $LIMMAT_COMMIT) + if [[ \"$commit_msg\" =~ {slow_sigterm_tag} ]]; then + sleep 2 + fi exit ${{exit_code:-0}} }} trap on_sigterm SIGTERM @@ -1235,6 +1254,7 @@ mod tests { lock_filename = if use_lockfile { Self::LOCK_FILENAME } else { "" }, bug_detected_path = dir.path().join(Self::BUG_DETECTED_PATH), block_tag = Self::BLOCK_COMMIT_MSG_TAG, + slow_sigterm_tag = Self::SLOW_SIGTERM_TAG, ); Self { @@ -2365,4 +2385,75 @@ mod tests { // TODO: Test that changes to dependency config hashes invalidates // result cache. } + + #[tokio::test] + async fn should_cancel_waiting_for_deps() { + let f = TestScriptFixture::builder() + .dependencies([(1, 0)]) + .num_worktrees(2) + .build() + .await; + + // Job 0 (dep) has SLOW_SIGTERM. + let mut msg = OsString::from(TestScript::BLOCK_COMMIT_MSG_TAG); + msg.push(" "); + msg.push(TestScript::SLOW_SIGTERM_TAG); + + let commit = f.repo.commit(msg).await.unwrap(); + + let mut results = f.manager.results(); + f.manager + .set_revisions(vec![commit.clone()]) + .await + .unwrap(); + + // Wait for Job 0 to start. + timeout_5s(f.scripts[0].started(&commit.hash)) + .await + .expect("Job 0 started"); + + // Job 1 should be waiting. + + // Start timing. + let start = Instant::now(); + + // Cancel. + f.manager.cancel_running().await.unwrap(); + + // Check results. Job 1 should be Canceled quickly. + let mut job1_done = false; + let mut job0_done = false; + // We might receive "Enqueued" or "Started" notifications before "Finished". + // We only care about Finished. + loop { + let notif = select! { + n = results.recv() => n.unwrap(), + _ = sleep(Duration::from_secs(5)) => panic!("Timed out waiting for results"), + }; + + if notif.test_case.test.name.to_string() == "test_1" { + if let TestStatus::Finished(res) = ¬if.status { + job1_done = true; + assert!( + start.elapsed() < Duration::from_secs(1), + "Job 1 took too long to cancel: {:?}", + start.elapsed() + ); + if let Err(TestInconclusive::Canceled) = res { + // Good + } else { + panic!("Job 1 did not cancel, got {:?}", res); + } + } + } + if notif.test_case.test.name.to_string() == "test_0" { + if let TestStatus::Finished(_) = ¬if.status { + job0_done = true; + } + } + if job1_done && job0_done { + break; + } + } + } }