Skip to content

Commit

Permalink
fix: correctly drop the worker
Browse files Browse the repository at this point in the history
  • Loading branch information
laktek committed Mar 10, 2023
1 parent da832ae commit e32a7b7
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 22 deletions.
47 changes: 31 additions & 16 deletions base/src/js_worker.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::utils::units::{bytes_to_display, human_elapsed, mib_to_bytes};

use anyhow::Error;
use anyhow::{bail, Error};
use deno_core::located_script_name;
use deno_core::url::Url;
use deno_core::JsRuntime;
Expand Down Expand Up @@ -63,6 +63,7 @@ pub struct JsWorker {
js_runtime: JsRuntime,
main_module_url: ModuleSpecifier,
unix_stream_tx: mpsc::UnboundedSender<UnixStream>,
halt_isolate_rx: oneshot::Receiver<()>,
}

impl JsWorker {
Expand Down Expand Up @@ -169,21 +170,24 @@ impl JsWorker {
op_state.put::<types::EnvVars>(env_vars);
}

let (halt_isolate_tx, halt_isolate_rx) = oneshot::channel::<()>();

let mut worker = Self {
js_runtime,
main_module_url,
unix_stream_tx,
halt_isolate_rx,
};

worker.start_controller_thread(worker_timeout_ms, memory_limit_rx);
worker.start_controller_thread(worker_timeout_ms, memory_limit_rx, halt_isolate_tx);
Ok(worker)
}

pub fn snapshot() {
unimplemented!();
}

pub fn run(self, shutdown_tx: oneshot::Sender<()>) -> Result<(), Error> {
pub fn run(mut self, shutdown_tx: oneshot::Sender<()>) -> Result<(), Error> {
let mut js_runtime = self.js_runtime;

let runtime = tokio::runtime::Builder::new_current_thread()
Expand All @@ -195,17 +199,26 @@ impl JsWorker {
let mod_id = js_runtime
.load_main_module(&self.main_module_url, None)
.await?;
let result = js_runtime.mod_evaluate(mod_id);
js_runtime.run_event_loop(false).await?;
let mod_result = js_runtime.mod_evaluate(mod_id);

result.await?
let result = tokio::select! {
_ = js_runtime.run_event_loop(false) => {
debug!("event loop completed");
mod_result.await?
}
_ = &mut self.halt_isolate_rx => {
debug!("worker exectution halted");
Ok(())
}
};

drop(js_runtime);
result
};

let local = tokio::task::LocalSet::new();
let res = local.block_on(&runtime, future);

// terminate the worker

if res.is_err() {
error!("worker thread panicked {:?}", res.as_ref().err().unwrap());
}
Expand All @@ -217,6 +230,7 @@ impl JsWorker {
&mut self,
worker_timeout_ms: u64,
mut memory_limit_rx: mpsc::UnboundedReceiver<u64>,
halt_execution_tx: oneshot::Sender<()>,
) {
let thread_safe_handle = self.js_runtime.v8_isolate().thread_safe_handle();

Expand All @@ -232,22 +246,23 @@ impl JsWorker {
debug!("max duration reached for the worker. terminating the worker. (duration {})", human_elapsed(worker_timeout_ms))
}
Some(val) = memory_limit_rx.recv() => {
error!("memory limit reached for the worker. terminating the worker. (used: {})", bytes_to_display(val))
error!("memory limit reached for the worker. terminating the worker. (used: {})", bytes_to_display(val));
thread_safe_handle.terminate_execution();
}
}
};
rt.block_on(future);

let ok = thread_safe_handle.terminate_execution();
if ok {
debug!("terminated execution");
} else {
debug!("worker is already destroyed");
if let Err(_) = halt_execution_tx.send(()) {
error!("failed to send the halt execution signal");
}
});
}

pub fn accept(&self, stream: UnixStream) -> () {
self.unix_stream_tx.send(stream);
pub fn accept(&self, stream: UnixStream) -> Result<(), Error> {
if let Err(e) = self.unix_stream_tx.send(stream) {
bail!(e)
}
Ok(())
}
}
8 changes: 2 additions & 6 deletions base/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl Server {
info!("serving function {}", service_name);

//let memory_limit_mb = u64::from(mem_limit);
let memory_limit_mb = (150 * 1024) as u64;
let memory_limit_mb = 150 as u64;
//let worker_timeout_ms = u64::from(service_timeout * 1000);
let worker_timeout_ms = (60 * 1000) as u64;

Expand All @@ -116,16 +116,12 @@ impl Server {
env_vars,
)?;

// check for worker error

worker.accept(recv_stream);
worker.accept(recv_stream)?;

// start the worker
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
worker.run(shutdown_tx)?;

debug!("js worker for {:?} started", service_path);

// wait for shutdown signal
let _ = shutdown_rx.blocking_recv();

Expand Down

0 comments on commit e32a7b7

Please sign in to comment.