Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libtest: Wait for test threads to exit after they report completion #81367

Merged
merged 2 commits into from
Jan 26, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 64 additions & 24 deletions library/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#![feature(nll)]
#![feature(available_concurrency)]
#![feature(internal_output_capture)]
#![feature(option_unwrap_none)]
#![feature(panic_unwind)]
#![feature(staged_api)]
#![feature(termination_trait_lib)]
Expand Down Expand Up @@ -61,6 +62,7 @@ pub mod test {
}

use std::{
collections::VecDeque,
env, io,
io::prelude::Write,
panic::{self, catch_unwind, AssertUnwindSafe, PanicInfo},
Expand Down Expand Up @@ -208,9 +210,19 @@ where
use std::collections::{self, HashMap};
use std::hash::BuildHasherDefault;
use std::sync::mpsc::RecvTimeoutError;

struct RunningTest {
join_handle: Option<thread::JoinHandle<()>>,
}

// Use a deterministic hasher
type TestMap =
HashMap<TestDesc, Instant, BuildHasherDefault<collections::hash_map::DefaultHasher>>;
HashMap<TestDesc, RunningTest, BuildHasherDefault<collections::hash_map::DefaultHasher>>;

struct TimeoutEntry {
desc: TestDesc,
timeout: Instant,
}

let tests_len = tests.len();

Expand Down Expand Up @@ -255,23 +267,30 @@ where
};

let mut running_tests: TestMap = HashMap::default();
let mut timeout_queue: VecDeque<TimeoutEntry> = VecDeque::new();

fn get_timed_out_tests(running_tests: &mut TestMap) -> Vec<TestDesc> {
fn get_timed_out_tests(
running_tests: &TestMap,
timeout_queue: &mut VecDeque<TimeoutEntry>,
) -> Vec<TestDesc> {
let now = Instant::now();
let timed_out = running_tests
.iter()
.filter_map(|(desc, timeout)| if &now >= timeout { Some(desc.clone()) } else { None })
.collect();
for test in &timed_out {
running_tests.remove(test);
let mut timed_out = Vec::new();
while let Some(timeout_entry) = timeout_queue.front() {
if now < timeout_entry.timeout {
break;
}
let timeout_entry = timeout_queue.pop_front().unwrap();
if running_tests.contains_key(&timeout_entry.desc) {
timed_out.push(timeout_entry.desc);
}
}
timed_out
}

fn calc_timeout(running_tests: &TestMap) -> Option<Duration> {
running_tests.values().min().map(|next_timeout| {
fn calc_timeout(timeout_queue: &VecDeque<TimeoutEntry>) -> Option<Duration> {
timeout_queue.front().map(|&TimeoutEntry { timeout: next_timeout, .. }| {
let now = Instant::now();
if *next_timeout >= now { *next_timeout - now } else { Duration::new(0, 0) }
if next_timeout >= now { next_timeout - now } else { Duration::new(0, 0) }
})
}

Expand All @@ -280,7 +299,8 @@ where
let test = remaining.pop().unwrap();
let event = TestEvent::TeWait(test.desc.clone());
notify_about_test_event(event)?;
run_test(opts, !opts.run_tests, test, run_strategy, tx.clone(), Concurrent::No);
run_test(opts, !opts.run_tests, test, run_strategy, tx.clone(), Concurrent::No)
.unwrap_none();
let completed_test = rx.recv().unwrap();

let event = TestEvent::TeResult(completed_test);
Expand All @@ -291,19 +311,28 @@ where
while pending < concurrency && !remaining.is_empty() {
let test = remaining.pop().unwrap();
let timeout = time::get_default_test_timeout();
running_tests.insert(test.desc.clone(), timeout);
let desc = test.desc.clone();

let event = TestEvent::TeWait(test.desc.clone());
let event = TestEvent::TeWait(desc.clone());
notify_about_test_event(event)?; //here no pad
run_test(opts, !opts.run_tests, test, run_strategy, tx.clone(), Concurrent::Yes);
let join_handle = run_test(
opts,
!opts.run_tests,
test,
run_strategy,
tx.clone(),
Concurrent::Yes,
);
running_tests.insert(desc.clone(), RunningTest { join_handle });
timeout_queue.push_back(TimeoutEntry { desc, timeout });
pending += 1;
}

let mut res;
loop {
if let Some(timeout) = calc_timeout(&running_tests) {
if let Some(timeout) = calc_timeout(&timeout_queue) {
res = rx.recv_timeout(timeout);
for test in get_timed_out_tests(&mut running_tests) {
for test in get_timed_out_tests(&running_tests, &mut timeout_queue) {
let event = TestEvent::TeTimeout(test);
notify_about_test_event(event)?;
}
Expand All @@ -323,8 +352,16 @@ where
}
}

let completed_test = res.unwrap();
running_tests.remove(&completed_test.desc);
let mut completed_test = res.unwrap();
let running_test = running_tests.remove(&completed_test.desc).unwrap();
if let Some(join_handle) = running_test.join_handle {
if let Err(_) = join_handle.join() {
if let TrOk = completed_test.result {
completed_test.result =
TrFailedMsg("panicked after reporting success".to_string());
}
}
}

let event = TestEvent::TeResult(completed_test);
notify_about_test_event(event)?;
Expand Down Expand Up @@ -415,7 +452,7 @@ pub fn run_test(
strategy: RunStrategy,
monitor_ch: Sender<CompletedTest>,
concurrency: Concurrent,
) {
) -> Option<thread::JoinHandle<()>> {
let TestDescAndFn { desc, testfn } = test;

// Emscripten can catch panics but other wasm targets cannot
Expand All @@ -426,7 +463,7 @@ pub fn run_test(
if force_ignore || desc.ignore || ignore_because_no_process_support {
let message = CompletedTest::new(desc, TrIgnored, None, Vec::new());
monitor_ch.send(message).unwrap();
return;
return None;
}

struct TestRunOpts {
Expand All @@ -441,7 +478,7 @@ pub fn run_test(
monitor_ch: Sender<CompletedTest>,
testfn: Box<dyn FnOnce() + Send>,
opts: TestRunOpts,
) {
) -> Option<thread::JoinHandle<()>> {
let concurrency = opts.concurrency;
let name = desc.name.clone();

Expand Down Expand Up @@ -469,9 +506,10 @@ pub fn run_test(
let supports_threads = !cfg!(target_os = "emscripten") && !cfg!(target_arch = "wasm32");
if concurrency == Concurrent::Yes && supports_threads {
let cfg = thread::Builder::new().name(name.as_slice().to_owned());
cfg.spawn(runtest).unwrap();
Some(cfg.spawn(runtest).unwrap())
} else {
runtest();
None
}
}

Expand All @@ -484,10 +522,12 @@ pub fn run_test(
crate::bench::benchmark(desc, monitor_ch, opts.nocapture, |harness| {
bencher.run(harness)
});
None
}
StaticBenchFn(benchfn) => {
// Benchmarks aren't expected to panic, so we run them all in-process.
crate::bench::benchmark(desc, monitor_ch, opts.nocapture, benchfn);
None
}
DynTestFn(f) => {
match strategy {
Expand All @@ -499,7 +539,7 @@ pub fn run_test(
monitor_ch,
Box::new(move || __rust_begin_short_backtrace(f)),
test_run_opts,
);
)
}
StaticTestFn(f) => run_test_inner(
desc,
Expand Down