Skip to content

Commit

Permalink
feat: introduce EdgeRuntime.getRuntimeMetrics
Browse files Browse the repository at this point in the history
  • Loading branch information
nyannyacha committed Feb 12, 2024
1 parent ee38bb8 commit cb9dcea
Show file tree
Hide file tree
Showing 14 changed files with 223 additions and 105 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions crates/base/src/deno_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ impl DenoRuntime {
// the task from the other threads.
// let mut current_thread_id = std::thread::current().id();

let result = match poll_fn(|cx| {
let poll_result = poll_fn(|cx| {
// INVARIANT: Only can steal current task by other threads when LIFO
// task scheduler heuristic disabled. Turning off the heuristic is
// unstable now, so it's not considered.
Expand Down Expand Up @@ -487,8 +487,9 @@ impl DenoRuntime {

poll_result
})
.await
{
.await;

let result = match poll_result {
Err(err) => Err(anyhow!("event loop error: {}", err)),
Ok(_) => match mod_result_rx.await {
Err(e) => {
Expand Down Expand Up @@ -576,6 +577,7 @@ mod test {
conf: {
WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
worker_pool_tx,
shared_metric_src: None,
event_worker_metric_src: None,
})
},
Expand Down Expand Up @@ -620,6 +622,7 @@ mod test {
conf: {
WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
worker_pool_tx,
shared_metric_src: None,
event_worker_metric_src: None,
})
},
Expand Down Expand Up @@ -686,6 +689,7 @@ mod test {
conf: {
WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
worker_pool_tx,
shared_metric_src: None,
event_worker_metric_src: None,
})
},
Expand Down Expand Up @@ -748,6 +752,7 @@ mod test {
} else {
WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
worker_pool_tx,
shared_metric_src: None,
event_worker_metric_src: None,
})
}
Expand Down
35 changes: 18 additions & 17 deletions crates/base/src/rt_worker/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use event_worker::events::{
use futures_util::FutureExt;
use log::{debug, error};
use sb_core::conn_sync::ConnSync;
use sb_core::{RuntimeMetricSource, WorkerMetricSource};
use sb_core::{MetricSource, RuntimeMetricSource, WorkerMetricSource};
use sb_workers::context::{UserWorkerMsgs, WorkerContextInitOpts};
use std::any::Any;
use std::future::{pending, Future};
Expand Down Expand Up @@ -88,7 +88,7 @@ impl Worker {
UnboundedSender<UnixStreamEntry>,
UnboundedReceiver<UnixStreamEntry>,
),
booter_signal: Sender<Result<WorkerMetricSource, Error>>,
booter_signal: Sender<Result<MetricSource, Error>>,
termination_token: Option<TerminationToken>,
) {
let worker_name = self.worker_name.clone();
Expand All @@ -103,11 +103,7 @@ impl Worker {
let method_cloner = self.clone();
let timing = opts.timing.take();
let worker_kind = opts.conf.to_worker_kind();
let maybe_event_worker_metric_src = opts
.conf
.as_main_worker()
.as_ref()
.and_then(|it| it.event_worker_metric_src.clone());
let maybe_main_worker_opts = opts.conf.as_main_worker().cloned();

let cancel = self.cancel.clone();
let rt = if worker_kind.is_user_worker() {
Expand All @@ -125,24 +121,29 @@ impl Worker {

let result = match DenoRuntime::new(opts).await {
Ok(mut new_runtime) => {
let metric = {
let metric_src = {
let js_runtime = &mut new_runtime.js_runtime;
let metric = WorkerMetricSource::from_js_runtime(js_runtime);
let metric_src = WorkerMetricSource::from_js_runtime(js_runtime);

if worker_kind.is_main_worker() {
let opts = maybe_main_worker_opts.unwrap();
let state = js_runtime.op_state();
let mut state_mut = state.borrow_mut();

state_mut.put(RuntimeMetricSource::new(
metric.clone(),
maybe_event_worker_metric_src,
));
let metric_src = RuntimeMetricSource::new(
metric_src.clone(),
opts.event_worker_metric_src
.and_then(|it| it.into_worker().ok()),
opts.shared_metric_src,
);

state_mut.put(metric_src.clone());
MetricSource::Runtime(metric_src)
} else {
MetricSource::Worker(metric_src)
}

metric
};

let _ = booter_signal.send(Ok(metric));
let _ = booter_signal.send(Ok(metric_src));

// CPU TIMER
let (termination_event_tx, termination_event_rx) =
Expand Down
116 changes: 59 additions & 57 deletions crates/base/src/rt_worker/worker_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use event_worker::events::{
use hyper::{Body, Request, Response};
use log::{debug, error};
use sb_core::conn_sync::ConnSync;
use sb_core::WorkerMetricSource;
use sb_core::{MetricSource, SharedMetricSource};
use sb_graph::EszipPayloadKind;
use sb_workers::context::{
EventWorkerRuntimeOpts, MainWorkerRuntimeOpts, Timing, UserWorkerMsgs, WorkerContextInitOpts,
Expand Down Expand Up @@ -310,10 +310,10 @@ impl CreateWorkerArgs {

pub async fn create_worker<Opt: Into<CreateWorkerArgs>>(
init_opts: Opt,
) -> Result<(WorkerMetricSource, mpsc::UnboundedSender<WorkerRequestMsg>), Error> {
) -> Result<(MetricSource, mpsc::UnboundedSender<WorkerRequestMsg>), Error> {
let (unix_stream_tx, unix_stream_rx) = mpsc::unbounded_channel::<UnixStreamEntry>();
let (worker_boot_result_tx, worker_boot_result_rx) =
oneshot::channel::<Result<WorkerMetricSource, Error>>();
oneshot::channel::<Result<MetricSource, Error>>();

let CreateWorkerArgs(init_opts, maybe_supervisor_policy, maybe_termination_token) =
init_opts.into();
Expand Down Expand Up @@ -464,13 +464,7 @@ pub async fn create_events_worker(
no_module_cache: bool,
maybe_entrypoint: Option<String>,
termination_token: Option<TerminationToken>,
) -> Result<
(
WorkerMetricSource,
mpsc::UnboundedSender<WorkerEventWithMetadata>,
),
Error,
> {
) -> Result<(MetricSource, mpsc::UnboundedSender<WorkerEventWithMetadata>), Error> {
let (events_tx, events_rx) = mpsc::unbounded_channel::<WorkerEventWithMetadata>();

let mut service_path = events_worker_path.clone();
Expand Down Expand Up @@ -509,71 +503,79 @@ pub async fn create_user_worker_pool(
policy: WorkerPoolPolicy,
worker_event_sender: Option<mpsc::UnboundedSender<WorkerEventWithMetadata>>,
termination_token: Option<TerminationToken>,
) -> Result<mpsc::UnboundedSender<UserWorkerMsgs>, Error> {
) -> Result<(SharedMetricSource, mpsc::UnboundedSender<UserWorkerMsgs>), Error> {
let metric_src = SharedMetricSource::default();
let (user_worker_msgs_tx, mut user_worker_msgs_rx) =
mpsc::unbounded_channel::<UserWorkerMsgs>();

let user_worker_msgs_tx_clone = user_worker_msgs_tx.clone();

let _handle: tokio::task::JoinHandle<Result<(), Error>> = tokio::spawn(async move {
let token = termination_token.as_ref();
let mut termination_requested = false;
let mut worker_pool =
WorkerPool::new(policy, worker_event_sender, user_worker_msgs_tx_clone);

// Note: Keep this loop non-blocking. Spawn a task to run blocking calls.
// Handle errors within tasks and log them - do not bubble up errors.
loop {
tokio::select! {
_ = async {
if let Some(token) = token {
token.inbound.cancelled().await;
} else {
pending::<()>().await;
}
}, if !termination_requested => {
termination_requested = true;

if worker_pool.user_workers.is_empty() {
let _handle: tokio::task::JoinHandle<Result<(), Error>> = tokio::spawn({
let metric_src_inner = metric_src.clone();
async move {
let token = termination_token.as_ref();
let mut termination_requested = false;
let mut worker_pool = WorkerPool::new(
policy,
metric_src_inner,
worker_event_sender,
user_worker_msgs_tx_clone,
);

// Note: Keep this loop non-blocking. Spawn a task to run blocking calls.
// Handle errors within tasks and log them - do not bubble up errors.
loop {
tokio::select! {
_ = async {
if let Some(token) = token {
token.outbound.cancel();
token.inbound.cancelled().await;
} else {
pending::<()>().await;
}
}, if !termination_requested => {
termination_requested = true;

break;
}
}
if worker_pool.user_workers.is_empty() {
if let Some(token) = token {
token.outbound.cancel();
}

msg = user_worker_msgs_rx.recv() => {
match msg {
None => break,
Some(UserWorkerMsgs::Create(worker_options, tx)) => {
worker_pool.create_user_worker(worker_options, tx, termination_token.as_ref().map(|it| it.child_token()));
}
Some(UserWorkerMsgs::Created(key, profile)) => {
worker_pool.add_user_worker(key, profile);
}
Some(UserWorkerMsgs::SendRequest(key, req, res_tx, conn_watch)) => {
worker_pool.send_request(&key, req, res_tx, conn_watch);
}
Some(UserWorkerMsgs::Idle(key)) => {
worker_pool.idle(&key);
break;
}
Some(UserWorkerMsgs::Shutdown(key)) => {
worker_pool.shutdown(&key);
}

if let Some(token) = token {
if token.inbound.is_cancelled() && worker_pool.user_workers.is_empty() {
token.outbound.cancel();
msg = user_worker_msgs_rx.recv() => {
match msg {
None => break,
Some(UserWorkerMsgs::Create(worker_options, tx)) => {
worker_pool.create_user_worker(worker_options, tx, termination_token.as_ref().map(|it| it.child_token()));
}
Some(UserWorkerMsgs::Created(key, profile)) => {
worker_pool.add_user_worker(key, profile);
}
Some(UserWorkerMsgs::SendRequest(key, req, res_tx, conn_watch)) => {
worker_pool.send_request(&key, req, res_tx, conn_watch);
}
Some(UserWorkerMsgs::Idle(key)) => {
worker_pool.idle(&key);
}
Some(UserWorkerMsgs::Shutdown(key)) => {
worker_pool.shutdown(&key);

if let Some(token) = token {
if token.inbound.is_cancelled() && worker_pool.user_workers.is_empty() {
token.outbound.cancel();
}
}
}
}
}
}
}
}

Ok(())
Ok(())
}
});

Ok(user_worker_msgs_tx)
Ok((metric_src, user_worker_msgs_tx))
}
8 changes: 8 additions & 0 deletions crates/base/src/rt_worker/worker_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use hyper::Body;
use log::error;
use sb_core::conn_sync::ConnSync;
use sb_core::util::sync::AtomicFlag;
use sb_core::SharedMetricSource;
use sb_workers::context::{
CreateUserWorkerResult, SendRequestResult, Timing, TimingStatus, UserWorkerMsgs,
UserWorkerProfile, WorkerContextInitOpts, WorkerRuntimeOpts,
Expand Down Expand Up @@ -203,6 +204,7 @@ impl ActiveWorkerRegistry {
// send_request is called with UUID
pub struct WorkerPool {
pub policy: WorkerPoolPolicy,
pub metric_src: SharedMetricSource,
pub user_workers: HashMap<Uuid, UserWorkerProfile>,
pub active_workers: HashMap<String, ActiveWorkerRegistry>,
pub worker_pool_msgs_tx: mpsc::UnboundedSender<UserWorkerMsgs>,
Expand All @@ -214,11 +216,13 @@ pub struct WorkerPool {
impl WorkerPool {
pub(crate) fn new(
policy: WorkerPoolPolicy,
metric_src: SharedMetricSource,
worker_event_sender: Option<UnboundedSender<WorkerEventWithMetadata>>,
worker_pool_msgs_tx: mpsc::UnboundedSender<UserWorkerMsgs>,
) -> Self {
Self {
policy,
metric_src,
worker_event_sender,
user_workers: HashMap::new(),
active_workers: HashMap::new(),
Expand Down Expand Up @@ -447,6 +451,7 @@ impl WorkerPool {
.insert(WorkerId(key, self.policy.supervisor_policy.is_per_worker()));

self.user_workers.insert(key, profile);
self.metric_src.incl_active_user_workers();
}

pub fn send_request(
Expand Down Expand Up @@ -552,6 +557,8 @@ impl WorkerPool {
};

let _ = notify_tx.send(None);

self.metric_src.decl_active_user_workers();
}

fn retire(&mut self, key: &Uuid) {
Expand All @@ -570,6 +577,7 @@ impl WorkerPool {

if registry.workers.contains(key) {
registry.workers.remove(key);
self.metric_src.incl_retired_user_worker();
}
}
}
Expand Down
Loading

0 comments on commit cb9dcea

Please sign in to comment.