Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 99 additions & 8 deletions src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,11 @@ pub struct TestJob {

pub type DepDatabaseEntries = HashMap<TestName, Arc<DatabaseEntry>>;

enum DepWaitError {
DependencyFailed(TestName),
Canceled,
}

impl<'a> TestJob {
pub fn subscribe_completion(&self) -> broadcast::Receiver<TestOutcome> {
self.notifier.subscribe_completion()
Expand Down Expand Up @@ -635,10 +640,13 @@ impl<'a> TestJob {
) -> TestOutcome {
// Wait for dependencies do be done, bail early if they do anything
// but terminate successfully.
let dep_db_entries = self
.await_dep_success()
.await
.map_err(|test_name| anyhow!("dependency job {test_name} failed"))?;
let dep_db_entries = match self.await_dep_success().await {
Ok(e) => e,
Err(DepWaitError::DependencyFailed(test_name)) => {
return Err(anyhow!("dependency job {test_name} failed").into())
}
Err(DepWaitError::Canceled) => return Err(TestInconclusive::Canceled),
};

// Throttle to avoid opening zillions of database entries (probably
// generally to avoid other resource exhaustions too).
Expand Down Expand Up @@ -695,7 +703,7 @@ impl<'a> TestJob {
// Blocks until all dependency jobs have succeeded and returns all the
// database entries containing their results, or returns an error reporting
// the name of the job that terminated without success.
async fn await_dep_success(&mut self) -> Result<DepDatabaseEntries, TestName> {
async fn await_dep_success(&mut self) -> Result<DepDatabaseEntries, DepWaitError> {
// This is another thing where the tokio::sync::watch API is a bit
// weird, there's no way to wait for a message without passing a
// predicate, so we have to pass a dummy one.
Expand All @@ -707,8 +715,15 @@ impl<'a> TestJob {
let mut wait_for: Vec<_> = wait_for.into_iter().map(Box::pin).collect();
let mut ret = HashMap::new();
while !wait_for.is_empty() {
let ((test_name, outcome), _idx, remaining) = select_all(wait_for).await;
wait_for = remaining;
let (test_name, outcome) = select! {
biased;
_ = self.ct.cancelled() => return Err(DepWaitError::Canceled),
((test_name, outcome), _idx, remaining) = select_all(wait_for) => {
wait_for = remaining;
(test_name, outcome)
}
};

// We are squashing lots of different types of failures and aborts
// (including the "impossible" case that the sender has been dropped
// and the rx.wait_for call failed) here, we trust that the other
Expand All @@ -727,7 +742,7 @@ impl<'a> TestJob {
"Dependency {:?} of {:?} failed: {:?}",
test_name, self.test_case.test.name, outcome
);
return Err(test_name.clone());
return Err(DepWaitError::DependencyFailed(test_name.clone()));
}
debug!("{:?}: Dependencies succeeded", self.test_case);
Ok(ret)
Expand Down Expand Up @@ -1153,6 +1168,7 @@ mod tests {
// We would like this to be an OsStr but you can't do that according to
// https://stackoverflow.com/questions/49226783/is-there-any-way-to-represent-an-osstr-or-osstring-literal
pub const BLOCK_COMMIT_MSG_TAG: &'static str = "block_this_test";
pub const SLOW_SIGTERM_TAG: &'static str = "slow_sigterm";

// Generate a tag which, when put in the commit message of a commit, will result in the test
// returning the given exit code.
Expand Down Expand Up @@ -1197,6 +1213,9 @@ mod tests {
exit_code=$(echo \"$commit_msg\" | perl -n -e'/exit_code\\((\\d+)\\)/ && print $1')
on_sigterm() {{
touch {sigtermed_path_prefix:?}$(git rev-parse $LIMMAT_COMMIT)
if [[ \"$commit_msg\" =~ {slow_sigterm_tag} ]]; then
sleep 2
fi
exit ${{exit_code:-0}}
}}
trap on_sigterm SIGTERM
Expand Down Expand Up @@ -1235,6 +1254,7 @@ mod tests {
lock_filename = if use_lockfile { Self::LOCK_FILENAME } else { "" },
bug_detected_path = dir.path().join(Self::BUG_DETECTED_PATH),
block_tag = Self::BLOCK_COMMIT_MSG_TAG,
slow_sigterm_tag = Self::SLOW_SIGTERM_TAG,
);

Self {
Expand Down Expand Up @@ -2365,4 +2385,75 @@ mod tests {
// TODO: Test that changes to dependency config hashes invalidates
// result cache.
}

#[tokio::test]
async fn should_cancel_waiting_for_deps() {
let f = TestScriptFixture::builder()
.dependencies([(1, 0)])
.num_worktrees(2)
.build()
.await;

// Job 0 (dep) has SLOW_SIGTERM.
let mut msg = OsString::from(TestScript::BLOCK_COMMIT_MSG_TAG);
msg.push(" ");
msg.push(TestScript::SLOW_SIGTERM_TAG);

let commit = f.repo.commit(msg).await.unwrap();

let mut results = f.manager.results();
f.manager
.set_revisions(vec![commit.clone()])
.await
.unwrap();

// Wait for Job 0 to start.
timeout_5s(f.scripts[0].started(&commit.hash))
.await
.expect("Job 0 started");

// Job 1 should be waiting.

// Start timing.
let start = Instant::now();

// Cancel.
f.manager.cancel_running().await.unwrap();

// Check results. Job 1 should be Canceled quickly.
let mut job1_done = false;
let mut job0_done = false;
// We might receive "Enqueued" or "Started" notifications before "Finished".
// We only care about Finished.
loop {
let notif = select! {
n = results.recv() => n.unwrap(),
_ = sleep(Duration::from_secs(5)) => panic!("Timed out waiting for results"),
};

if notif.test_case.test.name.to_string() == "test_1" {
if let TestStatus::Finished(res) = &notif.status {
job1_done = true;
assert!(
start.elapsed() < Duration::from_secs(1),
"Job 1 took too long to cancel: {:?}",
start.elapsed()
);
if let Err(TestInconclusive::Canceled) = res {
// Good
} else {
panic!("Job 1 did not cancel, got {:?}", res);
}
}
}
if notif.test_case.test.name.to_string() == "test_0" {
if let TestStatus::Finished(_) = &notif.status {
job0_done = true;
}
}
if job1_done && job0_done {
break;
}
}
}
}