Skip to content

Commit

Permalink
fix(cpu_timer): rid flakiness due to signal on integrate testing
Browse files Browse the repository at this point in the history
  • Loading branch information
nyannyacha committed Jan 16, 2024
1 parent 0765eb4 commit 5d332f3
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 44 deletions.
2 changes: 0 additions & 2 deletions crates/base/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ pub async fn start_server(
user_worker_policy: Option<WorkerPoolPolicy>,
import_map_path: Option<String>,
no_module_cache: bool,
no_signal_handler: bool,
callback_tx: Option<Sender<ServerCodes>>,
entrypoints: WorkerEntrypoints,
) -> Result<(), Error> {
Expand All @@ -35,7 +34,6 @@ pub async fn start_server(
user_worker_policy,
import_map_path,
no_module_cache,
no_signal_handler,
callback_tx,
entrypoints,
)
Expand Down
29 changes: 26 additions & 3 deletions crates/base/src/deno_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ mod test {
#[tokio::test]
async fn test_module_code_no_eszip() {
let (worker_pool_tx, _) = mpsc::unbounded_channel::<UserWorkerMsgs>();
DenoRuntime::new(WorkerContextInitOpts {
let mut rt = DenoRuntime::new(WorkerContextInitOpts {
service_path: PathBuf::from("./test_cases/"),
no_module_cache: false,
import_map_path: None,
Expand All @@ -513,6 +513,12 @@ mod test {
})
.await
.expect("It should not panic");

unsafe {
// NOTE: This is necessary because `DenoRuntime::new()` does detach
// its isolation from the current thread.
rt.js_runtime.v8_isolate().enter();
}
}

#[tokio::test]
Expand Down Expand Up @@ -546,6 +552,11 @@ mod test {
.await;

let mut rt = runtime.unwrap();
unsafe {
// NOTE: This is necessary because `DenoRuntime::new()` does detach
// its isolation from the current thread.
rt.js_runtime.v8_isolate().enter();
}

let main_mod_ev = rt.js_runtime.mod_evaluate(rt.main_module_id);
let _ = rt.js_runtime.run_event_loop(false).await;
Expand Down Expand Up @@ -595,6 +606,11 @@ mod test {
.await;

let mut rt = runtime.unwrap();
unsafe {
// NOTE: This is necessary because `DenoRuntime::new()` does detach
// its isolation from the current thread.
rt.js_runtime.v8_isolate().enter();
}

let main_mod_ev = rt.js_runtime.mod_evaluate(rt.main_module_id);
let _ = rt.js_runtime.run_event_loop(false).await;
Expand Down Expand Up @@ -623,7 +639,7 @@ mod test {
) -> DenoRuntime {
let (worker_pool_tx, _) = mpsc::unbounded_channel::<UserWorkerMsgs>();

DenoRuntime::new(WorkerContextInitOpts {
let mut rt = DenoRuntime::new(WorkerContextInitOpts {
service_path: path.unwrap_or(PathBuf::from("./test_cases/main")),
no_module_cache: false,
import_map_path: None,
Expand All @@ -642,7 +658,14 @@ mod test {
},
})
.await
.unwrap()
.unwrap();

unsafe {
// NOTE: This is necessary because `DenoRuntime::new()` does detach
// its isolation from the current thread.
rt.js_runtime.v8_isolate().enter();
rt
}
}

// Main Runtime should have access to `EdgeRuntime`
Expand Down
1 change: 0 additions & 1 deletion crates/base/src/macros/test_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ macro_rules! integration_test {
None,
None,
false,
true,
Some(tx.clone()),
$crate::server::WorkerEntrypoints {
main: None,
Expand Down
6 changes: 0 additions & 6 deletions crates/base/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ impl Server {
maybe_user_worker_policy: Option<WorkerPoolPolicy>,
import_map_path: Option<String>,
no_module_cache: bool,
no_signal_handler: bool,
callback_tx: Option<Sender<ServerCodes>>,
entrypoints: WorkerEntrypoints,
) -> Result<Self, Error> {
Expand Down Expand Up @@ -213,11 +212,6 @@ impl Server {
)
.await?;

if !no_signal_handler {
// register alarm signal handler
cpu_timer::register_alarm()?;
}

let ip = Ipv4Addr::from_str(ip)?;
Ok(Self {
ip,
Expand Down
1 change: 0 additions & 1 deletion crates/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ fn main() -> Result<(), anyhow::Error> {
)),
import_map_path,
no_module_cache,
cfg!(not(target_os = "linux")),
None,
WorkerEntrypoints {
main: maybe_main_entrypoint,
Expand Down
1 change: 1 addition & 0 deletions crates/cpu_timer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ libc = { workspace = true }
nix = { version = "0.26.2", features = ["signal"] }
tokio = { workspace = true }
log = { workspace = true }
ctor = { version = "0.2.6" }
84 changes: 53 additions & 31 deletions crates/cpu_timer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@ pub mod timerid;

use std::sync::Arc;

#[cfg(target_os = "linux")]
use crate::timerid::TimerId;
use anyhow::Error;
use tokio::sync::mpsc;

#[cfg(target_os = "linux")]
use anyhow::bail;
use anyhow::Error;
use log::debug;
use nix::sys::signal;
use tokio::sync::{mpsc, Mutex};
mod linux {
pub use crate::timerid::TimerId;
pub use anyhow::bail;
pub use ctor::ctor;
pub use log::debug;
pub use nix::sys::signal;
pub use tokio::sync::Mutex;
}

#[repr(C)]
#[derive(Clone)]
Expand All @@ -20,7 +23,7 @@ pub struct CPUAlarmVal {

#[cfg(target_os = "linux")]
struct CPUTimerVal {
id: TimerId,
id: linux::TimerId,
initial_expiry: u64,
interval: u64,
}
Expand All @@ -31,7 +34,7 @@ unsafe impl Send for CPUTimerVal {}
#[cfg(target_os = "linux")]
#[derive(Clone)]
pub struct CPUTimer {
_timer: Arc<Mutex<CPUTimerVal>>,
_timer: Arc<linux::Mutex<CPUTimerVal>>,
_cpu_alarm_val: Arc<CPUAlarmVal>,
}

Expand All @@ -46,6 +49,8 @@ impl CPUTimer {
interval: u64,
cpu_alarm_val: CPUAlarmVal,
) -> Result<Self, Error> {
use linux::*;

let mut timerid = TimerId(std::ptr::null_mut());
let mut sigev: libc::sigevent = unsafe { std::mem::zeroed() };
let cpu_alarm_val = Arc::new(cpu_alarm_val);
Expand Down Expand Up @@ -86,6 +91,7 @@ impl CPUTimer {
#[cfg(target_os = "linux")]
pub fn reset(&self) -> Result<(), Error> {
use anyhow::Context;
use linux::*;

let timer = self._timer.try_lock().context("failed to get the lock")?;

Expand Down Expand Up @@ -123,43 +129,59 @@ impl CPUTimer {
}
}

extern "C" fn sigalrm_handler(_: libc::c_int, info: *mut libc::siginfo_t, _: *mut libc::c_void) {
let cpu_alarms_tx = unsafe {
let sival = (*info).si_value();
let val = Arc::from_raw(sival.sival_ptr as *const CPUAlarmVal);
let sig = val.cpu_alarms_tx.clone();

std::mem::forget(val);
sig
pub fn get_thread_time() -> Result<i64, Error> {
let mut time = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};

if cpu_alarms_tx.send(()).is_err() {
debug!("failed to send cpu alarm to the provided channel");
if unsafe { libc::clock_gettime(libc::CLOCK_THREAD_CPUTIME_ID, &mut time) } == -1 {
return Err(std::io::Error::last_os_error().into());
}

// convert seconds to nanoseconds and add to nsec value
Ok(time.tv_sec * 1_000_000_000 + time.tv_nsec)
}

pub fn register_alarm() -> Result<(), Error> {
#[cfg_attr(target_os = "linux", linux::ctor)]
#[cfg(target_os = "linux")]
fn register_sigalrm() {
use linux::*;

let sig_handler = signal::SigHandler::SigAction(sigalrm_handler);
let sig_action = signal::SigAction::new(
sig_handler,
signal::SaFlags::empty(),
signal::SigSet::empty(),
);

unsafe {
signal::sigaction(signal::SIGALRM, &sig_action)?;
if let Err(err) = signal::sigaction(signal::SIGALRM, &sig_action) {
panic!("can't register signal handler: {}", err);
}
}
Ok(())
}

pub fn get_thread_time() -> Result<i64, Error> {
let mut time = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
#[cfg(target_os = "linux")]
extern "C" fn sigalrm_handler(
signo: libc::c_int,
info: *mut libc::siginfo_t,
_: *mut libc::c_void,
) {
use linux::*;

assert_eq!(signo, signal::SIGALRM as libc::c_int);

let cpu_alarms_tx = unsafe {
let sival = (*info).si_value();
let val = Arc::from_raw(sival.sival_ptr as *const CPUAlarmVal);
let sig = val.cpu_alarms_tx.clone();

std::mem::forget(val);
sig
};
if unsafe { libc::clock_gettime(libc::CLOCK_THREAD_CPUTIME_ID, &mut time) } == -1 {
return Err(std::io::Error::last_os_error().into());
}

// convert seconds to nanoseconds and add to nsec value
Ok(time.tv_sec * 1_000_000_000 + time.tv_nsec)
if cpu_alarms_tx.send(()).is_err() {
debug!("failed to send cpu alarm to the provided channel");
}
}

0 comments on commit 5d332f3

Please sign in to comment.