Skip to content

Commit

Permalink
fix(base): thread-local state of v8::Isolate can be corrupted while…
Browse files Browse the repository at this point in the history
… initializing DenoRuntime in `Worker::start` (#416)

* fix(base): thread-local state of `v8::Isolate` can be corrupted while initializing DenoRuntime in `Worker::start`

* Revert "fix(base): partial revert some changes that were introduced from deno upgrade PR (#415)"

This reverts commit db2cc41.

* stamp: more robustly
  • Loading branch information
nyannyacha authored Sep 30, 2024
1 parent e8759ec commit 539b706
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 51 deletions.
105 changes: 66 additions & 39 deletions crates/base/src/deno_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ use std::sync::{Arc, RwLock};
use std::task::Poll;
use std::thread::ThreadId;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore};
use tokio::time::interval;
use tokio_util::sync::CancellationToken;
use tokio_util::sync::{CancellationToken, PollSemaphore};
use tracing::debug;

use crate::snapshot;
use event_worker::events::{EventMetadata, WorkerEventWithMetadata};
Expand Down Expand Up @@ -93,6 +94,15 @@ pub static SHOULD_USE_VERBOSE_DEPRECATED_API_WARNING: OnceCell<bool> = OnceCell:
pub static SHOULD_INCLUDE_MALLOCED_MEMORY_ON_MEMCHECK: OnceCell<bool> = OnceCell::new();
pub static MAYBE_DENO_VERSION: OnceCell<String> = OnceCell::new();

thread_local! {
// NOTE: Suppose we have met `.await` points while initializing a
// DenoRuntime. In that case, the current v8 isolate's thread-local state can be
// corrupted by a task initializing another DenoRuntime, so we must prevent this
// with a Semaphore.

static RUNTIME_CREATION_SEM: Arc<Semaphore> = Arc::new(Semaphore::new(1));
}

#[ctor]
fn init_v8_platform() {
set_v8_flags();
Expand Down Expand Up @@ -219,6 +229,16 @@ impl<RuntimeContext> Drop for DenoRuntime<RuntimeContext> {
}
}

impl DenoRuntime<()> {
pub async fn acquire() -> OwnedSemaphorePermit {
RUNTIME_CREATION_SEM
.with(|v| v.clone())
.acquire_owned()
.await
.unwrap()
}
}

impl<RuntimeContext> DenoRuntime<RuntimeContext>
where
RuntimeContext: GetRuntimeContext,
Expand Down Expand Up @@ -702,7 +722,7 @@ where
let mut accumulated_cpu_time_ns = 0i64;

let has_inspector = self.inspector().is_some();
let mod_result_rx = unsafe {
let mut mod_result_rx = unsafe {
self.js_runtime.v8_isolate().enter();

if has_inspector {
Expand Down Expand Up @@ -750,38 +770,38 @@ where
};
}

// {
// let event_loop_fut = self.run_event_loop(
// name.as_deref(),
// current_thread_id,
// &maybe_cpu_usage_metrics_tx,
// &mut accumulated_cpu_time_ns,
// );

// let mod_result = tokio::select! {
// // Not using biased mode leads to non-determinism for relatively simple
// // programs.
// biased;

// maybe_mod_result = &mut mod_result_rx => {
// debug!("received module evaluate {:#?}", maybe_mod_result);
// maybe_mod_result

// }

// event_loop_result = event_loop_fut => {
// if let Err(err) = event_loop_result {
// Err(anyhow!("event loop error while evaluating the module: {}", err))
// } else {
// mod_result_rx.await
// }
// }
// };

// if let Err(err) = mod_result {
// return (Err(err), get_accumulated_cpu_time_ms!());
// }
// }
{
let event_loop_fut = self.run_event_loop(
name.as_deref(),
current_thread_id,
&maybe_cpu_usage_metrics_tx,
&mut accumulated_cpu_time_ns,
);

let mod_result = tokio::select! {
// Not using biased mode leads to non-determinism for relatively simple
// programs.
biased;

maybe_mod_result = &mut mod_result_rx => {
debug!("received module evaluate {:#?}", maybe_mod_result);
maybe_mod_result

}

event_loop_result = event_loop_fut => {
if let Err(err) = event_loop_result {
Err(anyhow!("event loop error while evaluating the module: {}", err))
} else {
mod_result_rx.await
}
}
};

if let Err(err) = mod_result {
return (Err(err), get_accumulated_cpu_time_ms!());
}
}

if let Err(err) = self
.run_event_loop(
Expand All @@ -798,10 +818,6 @@ where
);
}

if let Err(err) = mod_result_rx.await {
return (Err(err), get_accumulated_cpu_time_ms!());
}

(Ok(()), get_accumulated_cpu_time_ms!())
}

Expand All @@ -818,8 +834,19 @@ where
let termination_request_token = self.termination_request_token.clone();

let mem_check_state = is_user_worker.then(|| self.mem_check.clone());
let mut poll_sem = None::<PollSemaphore>;

poll_fn(move |cx| {
if poll_sem.is_none() {
poll_sem = Some(RUNTIME_CREATION_SEM.with(|v| PollSemaphore::new(v.clone())));
}

let Poll::Ready(Some(_permit)) = poll_sem.as_mut().unwrap().poll_acquire(cx) else {
return Poll::Pending;
};

poll_sem = None;

// INVARIANT: Only can steal current task by other threads when LIFO
// task scheduler heuristic disabled. Turning off the heuristic is
// unstable now, so it's not considered.
Expand Down
35 changes: 23 additions & 12 deletions crates/base/src/rt_worker/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,23 @@ impl Worker {
.then(unbounded_channel::<CPUUsageMetrics>)
.unzip();

let permit = DenoRuntime::acquire().await;
let result = match DenoRuntime::new(opts, inspector).await {
Ok(mut new_runtime) => {
Ok(new_runtime) => {
let mut runtime = scopeguard::guard(new_runtime, |mut runtime| {
unsafe {
runtime.js_runtime.v8_isolate().enter();
}
});

unsafe {
runtime.js_runtime.v8_isolate().exit();
}

drop(permit);

let metric_src = {
let js_runtime = &mut new_runtime.js_runtime;
let js_runtime = &mut runtime.js_runtime;
let metric_src = WorkerMetricSource::from_js_runtime(js_runtime);

if worker_kind.is_main_worker() {
Expand Down Expand Up @@ -164,7 +177,7 @@ impl Worker {
// cputimer is returned from supervisor and assigned here to keep it in scope.
let Ok((maybe_timer, cancel_token)) = create_supervisor(
worker_key.unwrap_or(Uuid::nil()),
&mut new_runtime,
&mut runtime,
supervisor_policy,
termination_event_tx,
pool_msg_tx.clone(),
Expand All @@ -181,12 +194,12 @@ impl Worker {

pending().boxed()
} else if let Some(token) = termination_token.clone() {
let is_terminated = new_runtime.is_terminated.clone();
let is_terminated = runtime.is_terminated.clone();
let termination_request_token =
new_runtime.termination_request_token.clone();
runtime.termination_request_token.clone();

let (waker, thread_safe_handle) = {
let js_runtime = &mut new_runtime.js_runtime;
let js_runtime = &mut runtime.js_runtime;
(
js_runtime.op_state().borrow().waker.clone(),
js_runtime.v8_isolate().thread_safe_handle(),
Expand Down Expand Up @@ -247,19 +260,14 @@ impl Worker {
});
});

let result = unsafe {
let mut runtime = scopeguard::guard(new_runtime, |mut runtime| {
runtime.js_runtime.v8_isolate().enter();
});

let result = {
let supervise_cancel_token =
scopeguard::guard_on_unwind(supervise_cancel_token, |token| {
if let Some(token) = token {
token.cancel();
}
});

runtime.js_runtime.v8_isolate().exit();

let result = method_cloner
.handle_creation(
Expand Down Expand Up @@ -303,8 +311,11 @@ impl Worker {
}

Err(err) => {
drop(permit);

let _ = booter_signal
.send(Err(anyhow!("worker boot error: {err}")));

method_cloner.handle_error(err)
}
};
Expand Down

0 comments on commit 539b706

Please sign in to comment.