Skip to content

Commit

Permalink
Merge pull request supabase#264 from nyannyacha/feat-metric-api
Browse files Browse the repository at this point in the history
feat: runtime metric API
  • Loading branch information
laktek authored Feb 13, 2024
2 parents dcaaf35 + cb9dcea commit 705db6e
Show file tree
Hide file tree
Showing 14 changed files with 487 additions and 111 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 30 additions & 7 deletions crates/base/src/deno_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ impl DenoRuntime {
// the task from the other threads.
// let mut current_thread_id = std::thread::current().id();

let result = match poll_fn(|cx| {
let poll_result = poll_fn(|cx| {
// INVARIANT: Only can steal current task by other threads when LIFO
// task scheduler heuristic disabled. Turning off the heuristic is
// unstable now, so it's not considered.
Expand Down Expand Up @@ -487,8 +487,9 @@ impl DenoRuntime {

poll_result
})
.await
{
.await;

let result = match poll_result {
Err(err) => Err(anyhow!("event loop error: {}", err)),
Ok(_) => match mod_result_rx.await {
Err(e) => {
Expand Down Expand Up @@ -573,7 +574,13 @@ mod test {
maybe_module_code: Some(FastString::from(String::from(
"Deno.serve((req) => new Response('Hello World'));",
))),
conf: { WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx }) },
conf: {
WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
worker_pool_tx,
shared_metric_src: None,
event_worker_metric_src: None,
})
},
})
.await
.expect("It should not panic");
Expand Down Expand Up @@ -612,7 +619,13 @@ mod test {
maybe_eszip: Some(EszipPayloadKind::VecKind(eszip_code)),
maybe_entrypoint: None,
maybe_module_code: None,
conf: { WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx }) },
conf: {
WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
worker_pool_tx,
shared_metric_src: None,
event_worker_metric_src: None,
})
},
})
.await;

Expand Down Expand Up @@ -673,7 +686,13 @@ mod test {
maybe_eszip: Some(EszipPayloadKind::VecKind(eszip_code)),
maybe_entrypoint: None,
maybe_module_code: None,
conf: { WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx }) },
conf: {
WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
worker_pool_tx,
shared_metric_src: None,
event_worker_metric_src: None,
})
},
})
.await;

Expand Down Expand Up @@ -731,7 +750,11 @@ mod test {
if let Some(uc) = user_conf {
uc
} else {
WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx })
WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
worker_pool_tx,
shared_metric_src: None,
event_worker_metric_src: None,
})
}
},
})
Expand Down
39 changes: 32 additions & 7 deletions crates/base/src/rt_worker/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use event_worker::events::{
use futures_util::FutureExt;
use log::{debug, error};
use sb_core::conn_sync::ConnSync;
use sb_core::{MetricSource, RuntimeMetricSource, WorkerMetricSource};
use sb_workers::context::{UserWorkerMsgs, WorkerContextInitOpts};
use std::any::Any;
use std::future::{pending, Future};
Expand Down Expand Up @@ -87,7 +88,7 @@ impl Worker {
UnboundedSender<UnixStreamEntry>,
UnboundedReceiver<UnixStreamEntry>,
),
booter_signal: Sender<Result<(), Error>>,
booter_signal: Sender<Result<MetricSource, Error>>,
termination_token: Option<TerminationToken>,
) {
let worker_name = self.worker_name.clone();
Expand All @@ -101,24 +102,48 @@ impl Worker {

let method_cloner = self.clone();
let timing = opts.timing.take();
let is_user_worker = opts.conf.is_user_worker();
let worker_kind = opts.conf.to_worker_kind();
let maybe_main_worker_opts = opts.conf.as_main_worker().cloned();

let cancel = self.cancel.clone();
let rt = if is_user_worker {
let rt = if worker_kind.is_user_worker() {
&rt::USER_WORKER_RT
} else {
&rt::PRIMARY_WORKER_RT
};

let _worker_handle = rt.spawn_pinned(move || {
tokio::task::spawn_local(async move {
let (maybe_cpu_usage_metrics_tx, maybe_cpu_usage_metrics_rx) = is_user_worker
let (maybe_cpu_usage_metrics_tx, maybe_cpu_usage_metrics_rx) = worker_kind
.is_user_worker()
.then(unbounded_channel::<CPUUsageMetrics>)
.unzip();

let result = match DenoRuntime::new(opts).await {
Ok(mut new_runtime) => {
let _ = booter_signal.send(Ok(()));
let metric_src = {
let js_runtime = &mut new_runtime.js_runtime;
let metric_src = WorkerMetricSource::from_js_runtime(js_runtime);

if worker_kind.is_main_worker() {
let opts = maybe_main_worker_opts.unwrap();
let state = js_runtime.op_state();
let mut state_mut = state.borrow_mut();
let metric_src = RuntimeMetricSource::new(
metric_src.clone(),
opts.event_worker_metric_src
.and_then(|it| it.into_worker().ok()),
opts.shared_metric_src,
);

state_mut.put(metric_src.clone());
MetricSource::Runtime(metric_src)
} else {
MetricSource::Worker(metric_src)
}
};

let _ = booter_signal.send(Ok(metric_src));

// CPU TIMER
let (termination_event_tx, termination_event_rx) =
Expand All @@ -127,7 +152,7 @@ impl Worker {
let _cpu_timer;

// TODO: Allow customization of supervisor
let termination_fut = if is_user_worker {
let termination_fut = if worker_kind.is_user_worker() {
// cputimer is returned from supervisor and assigned here to keep it in scope.
let Ok(maybe_timer) = create_supervisor(
worker_key.unwrap_or(Uuid::nil()),
Expand Down Expand Up @@ -209,7 +234,7 @@ impl Worker {
let result = data.await;

if let Some(token) = termination_token.as_ref() {
if !is_user_worker {
if !worker_kind.is_user_worker() {
let _ = termination_fut.await;
}

Expand Down
Loading

0 comments on commit 705db6e

Please sign in to comment.