diff --git a/crates/base/src/deno_runtime.rs b/crates/base/src/deno_runtime.rs index 4428d9b2..be0d8c06 100644 --- a/crates/base/src/deno_runtime.rs +++ b/crates/base/src/deno_runtime.rs @@ -40,9 +40,10 @@ use std::sync::{Arc, RwLock}; use std::task::Poll; use std::thread::ThreadId; use std::time::Duration; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore}; use tokio::time::interval; -use tokio_util::sync::CancellationToken; +use tokio_util::sync::{CancellationToken, PollSemaphore}; +use tracing::debug; use crate::snapshot; use event_worker::events::{EventMetadata, WorkerEventWithMetadata}; @@ -93,6 +94,15 @@ pub static SHOULD_USE_VERBOSE_DEPRECATED_API_WARNING: OnceCell = OnceCell: pub static SHOULD_INCLUDE_MALLOCED_MEMORY_ON_MEMCHECK: OnceCell = OnceCell::new(); pub static MAYBE_DENO_VERSION: OnceCell = OnceCell::new(); +thread_local! { + // NOTE: Suppose we have met `.await` points while initializing a + // DenoRuntime. In that case, the current v8 isolate's thread-local state can be + // corrupted by a task initializing another DenoRuntime, so we must prevent this + // with a Semaphore. + + static RUNTIME_CREATION_SEM: Arc = Arc::new(Semaphore::new(1)); +} + #[ctor] fn init_v8_platform() { set_v8_flags(); @@ -219,6 +229,16 @@ impl Drop for DenoRuntime { } } +impl DenoRuntime<()> { + pub async fn acquire() -> OwnedSemaphorePermit { + RUNTIME_CREATION_SEM + .with(|v| v.clone()) + .acquire_owned() + .await + .unwrap() + } +} + impl DenoRuntime where RuntimeContext: GetRuntimeContext, @@ -702,7 +722,7 @@ where let mut accumulated_cpu_time_ns = 0i64; let has_inspector = self.inspector().is_some(); - let mod_result_rx = unsafe { + let mut mod_result_rx = unsafe { self.js_runtime.v8_isolate().enter(); if has_inspector { @@ -750,38 +770,38 @@ where }; } - // { - // let event_loop_fut = self.run_event_loop( - // name.as_deref(), - // current_thread_id, - // &maybe_cpu_usage_metrics_tx, - // &mut accumulated_cpu_time_ns, - // ); - - // let mod_result = tokio::select! { - // // Not using biased mode leads to non-determinism for relatively simple - // // programs. - // biased; - - // maybe_mod_result = &mut mod_result_rx => { - // debug!("received module evaluate {:#?}", maybe_mod_result); - // maybe_mod_result - - // } - - // event_loop_result = event_loop_fut => { - // if let Err(err) = event_loop_result { - // Err(anyhow!("event loop error while evaluating the module: {}", err)) - // } else { - // mod_result_rx.await - // } - // } - // }; - - // if let Err(err) = mod_result { - // return (Err(err), get_accumulated_cpu_time_ms!()); - // } - // } + { + let event_loop_fut = self.run_event_loop( + name.as_deref(), + current_thread_id, + &maybe_cpu_usage_metrics_tx, + &mut accumulated_cpu_time_ns, + ); + + let mod_result = tokio::select! { + // Not using biased mode leads to non-determinism for relatively simple + // programs. + biased; + + maybe_mod_result = &mut mod_result_rx => { + debug!("received module evaluate {:#?}", maybe_mod_result); + maybe_mod_result + + } + + event_loop_result = event_loop_fut => { + if let Err(err) = event_loop_result { + Err(anyhow!("event loop error while evaluating the module: {}", err)) + } else { + mod_result_rx.await + } + } + }; + + if let Err(err) = mod_result { + return (Err(err), get_accumulated_cpu_time_ms!()); + } + } if let Err(err) = self .run_event_loop( @@ -798,10 +818,6 @@ where ); } - if let Err(err) = mod_result_rx.await { - return (Err(err), get_accumulated_cpu_time_ms!()); - } - (Ok(()), get_accumulated_cpu_time_ms!()) } @@ -818,8 +834,19 @@ where let termination_request_token = self.termination_request_token.clone(); let mem_check_state = is_user_worker.then(|| self.mem_check.clone()); + let mut poll_sem = None::; poll_fn(move |cx| { + if poll_sem.is_none() { + poll_sem = Some(RUNTIME_CREATION_SEM.with(|v| PollSemaphore::new(v.clone()))); + } + + let Poll::Ready(Some(_permit)) = poll_sem.as_mut().unwrap().poll_acquire(cx) else { + return Poll::Pending; + }; + + poll_sem = None; + // INVARIANT: Only can steal current task by other threads when LIFO // task scheduler heuristic disabled. Turning off the heuristic is // unstable now, so it's not considered. diff --git a/crates/base/src/rt_worker/worker.rs b/crates/base/src/rt_worker/worker.rs index a6e1e8dc..67881e57 100644 --- a/crates/base/src/rt_worker/worker.rs +++ b/crates/base/src/rt_worker/worker.rs @@ -126,10 +126,23 @@ impl Worker { .then(unbounded_channel::) .unzip(); + let permit = DenoRuntime::acquire().await; let result = match DenoRuntime::new(opts, inspector).await { - Ok(mut new_runtime) => { + Ok(new_runtime) => { + let mut runtime = scopeguard::guard(new_runtime, |mut runtime| { + unsafe { + runtime.js_runtime.v8_isolate().enter(); + } + }); + + unsafe { + runtime.js_runtime.v8_isolate().exit(); + } + + drop(permit); + let metric_src = { - let js_runtime = &mut new_runtime.js_runtime; + let js_runtime = &mut runtime.js_runtime; let metric_src = WorkerMetricSource::from_js_runtime(js_runtime); if worker_kind.is_main_worker() { @@ -164,7 +177,7 @@ impl Worker { // cputimer is returned from supervisor and assigned here to keep it in scope. let Ok((maybe_timer, cancel_token)) = create_supervisor( worker_key.unwrap_or(Uuid::nil()), - &mut new_runtime, + &mut runtime, supervisor_policy, termination_event_tx, pool_msg_tx.clone(), @@ -181,12 +194,12 @@ impl Worker { pending().boxed() } else if let Some(token) = termination_token.clone() { - let is_terminated = new_runtime.is_terminated.clone(); + let is_terminated = runtime.is_terminated.clone(); let termination_request_token = - new_runtime.termination_request_token.clone(); + runtime.termination_request_token.clone(); let (waker, thread_safe_handle) = { - let js_runtime = &mut new_runtime.js_runtime; + let js_runtime = &mut runtime.js_runtime; ( js_runtime.op_state().borrow().waker.clone(), js_runtime.v8_isolate().thread_safe_handle(), @@ -247,11 +260,7 @@ impl Worker { }); }); - let result = unsafe { - let mut runtime = scopeguard::guard(new_runtime, |mut runtime| { - runtime.js_runtime.v8_isolate().enter(); - }); - + let result = { let supervise_cancel_token = scopeguard::guard_on_unwind(supervise_cancel_token, |token| { if let Some(token) = token { @@ -259,7 +268,6 @@ impl Worker { } }); - runtime.js_runtime.v8_isolate().exit(); let result = method_cloner .handle_creation( @@ -303,8 +311,11 @@ impl Worker { } Err(err) => { + drop(permit); + let _ = booter_signal .send(Err(anyhow!("worker boot error: {err}"))); + method_cloner.handle_error(err) } };