diff --git a/base/src/js_worker.rs b/base/src/js_worker.rs index b769c0d9..dea7b7a7 100644 --- a/base/src/js_worker.rs +++ b/base/src/js_worker.rs @@ -1,6 +1,6 @@ use crate::utils::units::{bytes_to_display, human_elapsed, mib_to_bytes}; -use anyhow::Error; +use anyhow::{bail, Error}; use deno_core::located_script_name; use deno_core::url::Url; use deno_core::JsRuntime; @@ -63,6 +63,7 @@ pub struct JsWorker { js_runtime: JsRuntime, main_module_url: ModuleSpecifier, unix_stream_tx: mpsc::UnboundedSender, + halt_isolate_rx: oneshot::Receiver<()>, } impl JsWorker { @@ -169,13 +170,16 @@ impl JsWorker { op_state.put::(env_vars); } + let (halt_isolate_tx, halt_isolate_rx) = oneshot::channel::<()>(); + let mut worker = Self { js_runtime, main_module_url, unix_stream_tx, + halt_isolate_rx, }; - worker.start_controller_thread(worker_timeout_ms, memory_limit_rx); + worker.start_controller_thread(worker_timeout_ms, memory_limit_rx, halt_isolate_tx); Ok(worker) } @@ -183,7 +187,7 @@ impl JsWorker { unimplemented!(); } - pub fn run(self, shutdown_tx: oneshot::Sender<()>) -> Result<(), Error> { + pub fn run(mut self, shutdown_tx: oneshot::Sender<()>) -> Result<(), Error> { let mut js_runtime = self.js_runtime; let runtime = tokio::runtime::Builder::new_current_thread() @@ -195,17 +199,26 @@ impl JsWorker { let mod_id = js_runtime .load_main_module(&self.main_module_url, None) .await?; - let result = js_runtime.mod_evaluate(mod_id); - js_runtime.run_event_loop(false).await?; + let mod_result = js_runtime.mod_evaluate(mod_id); - result.await? + let result = tokio::select! { + _ = js_runtime.run_event_loop(false) => { + debug!("event loop completed"); + mod_result.await? + } + _ = &mut self.halt_isolate_rx => { + debug!("worker exectution halted"); + Ok(()) + } + }; + + drop(js_runtime); + result }; let local = tokio::task::LocalSet::new(); let res = local.block_on(&runtime, future); - // terminate the worker - if res.is_err() { error!("worker thread panicked {:?}", res.as_ref().err().unwrap()); } @@ -217,6 +230,7 @@ impl JsWorker { &mut self, worker_timeout_ms: u64, mut memory_limit_rx: mpsc::UnboundedReceiver, + halt_execution_tx: oneshot::Sender<()>, ) { let thread_safe_handle = self.js_runtime.v8_isolate().thread_safe_handle(); @@ -232,22 +246,23 @@ impl JsWorker { debug!("max duration reached for the worker. terminating the worker. (duration {})", human_elapsed(worker_timeout_ms)) } Some(val) = memory_limit_rx.recv() => { - error!("memory limit reached for the worker. terminating the worker. (used: {})", bytes_to_display(val)) + error!("memory limit reached for the worker. terminating the worker. (used: {})", bytes_to_display(val)); + thread_safe_handle.terminate_execution(); } } }; rt.block_on(future); - let ok = thread_safe_handle.terminate_execution(); - if ok { - debug!("terminated execution"); - } else { - debug!("worker is already destroyed"); + if let Err(_) = halt_execution_tx.send(()) { + error!("failed to send the halt execution signal"); } }); } - pub fn accept(&self, stream: UnixStream) -> () { - self.unix_stream_tx.send(stream); + pub fn accept(&self, stream: UnixStream) -> Result<(), Error> { + if let Err(e) = self.unix_stream_tx.send(stream) { + bail!(e) + } + Ok(()) } } diff --git a/base/src/server.rs b/base/src/server.rs index 5336fe2a..2d34a15e 100644 --- a/base/src/server.rs +++ b/base/src/server.rs @@ -94,7 +94,7 @@ impl Server { info!("serving function {}", service_name); //let memory_limit_mb = u64::from(mem_limit); - let memory_limit_mb = (150 * 1024) as u64; + let memory_limit_mb = 150 as u64; //let worker_timeout_ms = u64::from(service_timeout * 1000); let worker_timeout_ms = (60 * 1000) as u64; @@ -116,16 +116,12 @@ impl Server { env_vars, )?; - // check for worker error - - worker.accept(recv_stream); + worker.accept(recv_stream)?; // start the worker let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); worker.run(shutdown_tx)?; - debug!("js worker for {:?} started", service_path); - // wait for shutdown signal let _ = shutdown_rx.blocking_recv();