diff --git a/Cargo.lock b/Cargo.lock index 3c40d204..07707f14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -303,6 +303,7 @@ dependencies = [ "tokio", "url", "urlencoding", + "uuid", ] [[package]] diff --git a/crates/base/Cargo.toml b/crates/base/Cargo.toml index 8a2f261d..cf8a62f8 100644 --- a/crates/base/Cargo.toml +++ b/crates/base/Cargo.toml @@ -41,6 +41,7 @@ sb_env = { version = "0.1.0", path = "../sb_env" } sb_core = { version = "0.1.0", path = "../sb_core" } sb_os = { version = "0.1.0", path = "../sb_os" } urlencoding = { version = "2.1.2" } +uuid = { workspace = true } [dev-dependencies] futures-util = { version = "0.3.28" } diff --git a/crates/base/src/deno_runtime.rs b/crates/base/src/deno_runtime.rs index b05bfbc7..aab26ef8 100644 --- a/crates/base/src/deno_runtime.rs +++ b/crates/base/src/deno_runtime.rs @@ -25,10 +25,8 @@ use sb_core::permissions::{sb_core_permissions, Permissions}; use sb_core::runtime::sb_core_runtime; use sb_core::sb_core_main_js; use sb_env::sb_env as sb_env_op; -use sb_worker_context::essentials::{ - EventWorkerRuntimeOpts, UserWorkerMsgs, WorkerContextInitOpts, WorkerRuntimeOpts, -}; -use sb_worker_context::events::WorkerEvents; +use sb_worker_context::essentials::{UserWorkerMsgs, WorkerContextInitOpts, WorkerRuntimeOpts}; +use sb_worker_context::events::WorkerEventWithMetadata; use sb_workers::events::sb_user_event_worker; use sb_workers::sb_user_workers; @@ -115,26 +113,16 @@ pub struct DenoRuntime { } impl DenoRuntime { - pub async fn new( - opts: WorkerContextInitOpts, - event_manager_opts: Option, // TODO: refactor this be part of opts - ) -> Result { - let mut maybe_events_msg_tx: Option> = None; - + pub async fn new(opts: WorkerContextInitOpts) -> Result { let WorkerContextInitOpts { service_path, no_module_cache, import_map_path, env_vars, + events_rx, conf, } = opts; - if conf.is_user_worker() { - if let Some(events_msg_tx) = conf.as_user_worker().unwrap().events_msg_tx.clone() { - maybe_events_msg_tx = Some(events_msg_tx) - } - } - set_v8_flags(); let user_agent = "supabase-edge-runtime".to_string(); @@ -237,17 +225,14 @@ impl DenoRuntime { op_state.put::(env_vars); if conf.is_events_worker() { - if let WorkerRuntimeOpts::EventsWorker = conf.clone() { - // We unwrap because event_manager_opts must always be present when type is `EventsWorker` - op_state.put::>( - event_manager_opts.unwrap().event_rx, - ); - } + // if worker is an events worker, assert events_rx is to be available + op_state + .put::>(events_rx.unwrap()); } if conf.is_user_worker() { - if let Some(events_msg_tx) = maybe_events_msg_tx.clone() { - op_state.put::>(events_msg_tx); + if let Some(events_msg_tx) = conf.as_user_worker().unwrap().events_msg_tx.clone() { + op_state.put::>(events_msg_tx); } } } @@ -262,13 +247,6 @@ impl DenoRuntime { }) } - pub fn event_sender(&self) -> Option> { - self.conf - .as_user_worker() - .map(|c| c.events_msg_tx.clone()) - .unwrap() - } - pub async fn run( mut self, unix_stream_rx: mpsc::UnboundedReceiver, @@ -353,22 +331,20 @@ mod test { ) -> DenoRuntime { let (worker_pool_tx, _) = mpsc::unbounded_channel::(); - DenoRuntime::new( - WorkerContextInitOpts { - service_path: path.unwrap_or(PathBuf::from("./test_cases/main")), - no_module_cache: false, - import_map_path: None, - env_vars: env_vars.unwrap_or(Default::default()), - conf: { - if let Some(uc) = user_conf { - uc - } else { - WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx }) - } - }, + DenoRuntime::new(WorkerContextInitOpts { + service_path: path.unwrap_or(PathBuf::from("./test_cases/main")), + no_module_cache: false, + import_map_path: None, + env_vars: env_vars.unwrap_or(Default::default()), + events_rx: None, + conf: { + if let Some(uc) = user_conf { + uc + } else { + WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx }) + } }, - None, - ) + }) .await .unwrap() } @@ -624,6 +600,8 @@ mod test { key: None, pool_msg_tx: None, events_msg_tx: None, + execution_id: None, + service_path: None, })), ) .await diff --git a/crates/base/src/server.rs b/crates/base/src/server.rs index 6775e0f6..39250439 100644 --- a/crates/base/src/server.rs +++ b/crates/base/src/server.rs @@ -1,5 +1,5 @@ use crate::worker_ctx::{ - create_event_worker, create_user_worker_pool, create_worker, WorkerRequestMsg, + create_events_worker, create_user_worker_pool, create_worker, WorkerRequestMsg, }; use anyhow::Error; use hyper::{server::conn::Http, service::Service, Body, Request, Response}; @@ -7,7 +7,7 @@ use log::{debug, error, info}; use sb_worker_context::essentials::{ MainWorkerRuntimeOpts, WorkerContextInitOpts, WorkerRuntimeOpts, }; -use sb_worker_context::events::WorkerEvents; +use sb_worker_context::events::WorkerEventWithMetadata; use std::future::Future; use std::net::IpAddr; use std::net::Ipv4Addr; @@ -92,38 +92,36 @@ impl Server { no_module_cache: bool, callback_tx: Option>, ) -> Result { - let mut worker_event_sender: Option> = None; + let mut worker_events_sender: Option> = None; // Create Event Worker if let Some(events_service_path) = maybe_events_service_path { let events_path = Path::new(&events_service_path); let events_path_buf = events_path.to_path_buf(); - let event_worker = - create_event_worker(events_path_buf, import_map_path.clone(), no_module_cache) + let events_worker = + create_events_worker(events_path_buf, import_map_path.clone(), no_module_cache) .await .expect("Event worker could not be created"); - worker_event_sender = Some(event_worker); + worker_events_sender = Some(events_worker); } // Create a user worker pool - let user_worker_msgs_tx = create_user_worker_pool(worker_event_sender).await?; + let user_worker_msgs_tx = create_user_worker_pool(worker_events_sender).await?; // create main worker let main_path = Path::new(&main_service_path); - let main_worker_req_tx = create_worker( - WorkerContextInitOpts { - service_path: main_path.to_path_buf(), - import_map_path, - no_module_cache, - conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { - worker_pool_tx: user_worker_msgs_tx, - }), - env_vars: std::env::vars().collect(), - }, - None, - ) + let main_worker_req_tx = create_worker(WorkerContextInitOpts { + service_path: main_path.to_path_buf(), + import_map_path, + no_module_cache, + events_rx: None, + conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { + worker_pool_tx: user_worker_msgs_tx, + }), + env_vars: std::env::vars().collect(), + }) .await?; // register alarm signal handler diff --git a/crates/base/src/utils.rs b/crates/base/src/utils.rs index d84950e8..18473a67 100644 --- a/crates/base/src/utils.rs +++ b/crates/base/src/utils.rs @@ -1,13 +1,14 @@ -use sb_worker_context::events::WorkerEvents; +use sb_worker_context::events::{EventMetadata, WorkerEventWithMetadata, WorkerEvents}; use tokio::sync::mpsc; pub mod units; pub fn send_event_if_event_manager_available( - maybe_event_manager: Option>, + maybe_event_manager: Option>, event: WorkerEvents, + metadata: EventMetadata, ) { if let Some(event_manager) = maybe_event_manager { - let _ = event_manager.send(event); + let _ = event_manager.send(WorkerEventWithMetadata { event, metadata }); } } diff --git a/crates/base/src/worker_ctx.rs b/crates/base/src/worker_ctx.rs index 6329c702..08318342 100644 --- a/crates/base/src/worker_ctx.rs +++ b/crates/base/src/worker_ctx.rs @@ -12,7 +12,8 @@ use sb_worker_context::essentials::{ WorkerRuntimeOpts, }; use sb_worker_context::events::{ - BootEvent, BootFailure, LogEvent, LogLevel, PseudoEvent, UncaughtException, WorkerEvents, + BootEvent, BootFailure, EventMetadata, LogEvent, LogLevel, PseudoEvent, UncaughtException, + WorkerEventWithMetadata, WorkerEvents, }; use std::collections::HashMap; use std::path::PathBuf; @@ -30,7 +31,6 @@ pub struct WorkerRequestMsg { #[derive(Debug, Clone)] pub struct UserWorkerProfile { worker_event_tx: mpsc::UnboundedSender, - event_manager_tx: Option>, } async fn handle_request( @@ -150,9 +150,24 @@ fn create_supervisor( Ok(cputimer) } +fn get_event_metadata(conf: &WorkerRuntimeOpts) -> EventMetadata { + let mut event_metadata = EventMetadata { + service_path: None, + execution_id: None, + }; + if conf.is_user_worker() { + let conf = conf.as_user_worker().unwrap(); + event_metadata = EventMetadata { + service_path: conf.service_path.clone(), + execution_id: conf.execution_id, + }; + } + + event_metadata +} + pub async fn create_worker( init_opts: WorkerContextInitOpts, - event_manager_opts: Option, ) -> Result, Error> { let service_path = init_opts.service_path.clone(); @@ -163,7 +178,7 @@ pub async fn create_worker( let (worker_boot_result_tx, worker_boot_result_rx) = oneshot::channel::>(); let (unix_stream_tx, unix_stream_rx) = mpsc::unbounded_channel::(); - let (worker_key, pool_msg_tx, event_msg_tx, thread_name) = match &init_opts.conf { + let (worker_key, pool_msg_tx, events_msg_tx, thread_name) = match &init_opts.conf { WorkerRuntimeOpts::UserWorker(worker_opts) => ( worker_opts.key, worker_opts.pool_msg_tx.clone(), @@ -174,9 +189,14 @@ pub async fn create_worker( .unwrap_or("isolate-worker-unknown".to_string()), ), WorkerRuntimeOpts::MainWorker(_) => (None, None, None, "main-worker".to_string()), - WorkerRuntimeOpts::EventsWorker => (None, None, None, "events-worker".to_string()), + WorkerRuntimeOpts::EventsWorker(_) => (None, None, None, "events-worker".to_string()), }; + let event_metadata = get_event_metadata(&init_opts.conf); + + let worker_boot_start_time = Instant::now(); + let events_msg_tx_clone = events_msg_tx.clone(); + let event_metadata_clone = event_metadata.clone(); // spawn a thread to run the worker let _handle: thread::JoinHandle> = thread::Builder::new() .name(thread_name) @@ -189,8 +209,9 @@ pub async fn create_worker( let mut start_time = 0; let result: Result = local.block_on(&runtime, async { - match DenoRuntime::new(init_opts, event_manager_opts).await { + match DenoRuntime::new(init_opts).await { Err(err) => { + println!("{}", err); let _ = worker_boot_result_tx.send(Err(anyhow!("worker boot error"))); Ok(WorkerEvents::BootFailure(BootFailure { msg: err.to_string(), @@ -235,18 +256,23 @@ pub async fn create_worker( match result { Ok(event) => { - send_event_if_event_manager_available(event_msg_tx.clone(), event); + send_event_if_event_manager_available( + events_msg_tx_clone.clone(), + event, + event_metadata_clone.clone(), + ); } Err(err) => error!("unexpected worker error {}", err), }; let end_time = get_thread_time()?; send_event_if_event_manager_available( - event_msg_tx.clone(), + events_msg_tx_clone.clone(), WorkerEvents::Log(LogEvent { msg: format!("CPU time used: {:?}ms", (end_time - start_time) / 1_000_000), level: LogLevel::Info, }), + event_metadata_clone.clone(), ); // remove the worker from pool @@ -292,7 +318,17 @@ pub async fn create_worker( worker_req_handle.abort(); bail!(err) } - Ok(_) => Ok(worker_req_tx), + Ok(_) => { + let elapsed = worker_boot_start_time.elapsed().as_millis(); + send_event_if_event_manager_available( + events_msg_tx, + WorkerEvents::Boot(BootEvent { + boot_time: elapsed as usize, + }), + event_metadata, + ); + Ok(worker_req_tx) + } } } @@ -314,30 +350,28 @@ async fn send_user_worker_request( Ok(res) } -pub async fn create_event_worker( - event_worker_path: PathBuf, +pub async fn create_events_worker( + events_worker_path: PathBuf, import_map_path: Option, no_module_cache: bool, -) -> Result, Error> { - let (event_tx, event_rx) = mpsc::unbounded_channel::(); - - let _ = create_worker( - WorkerContextInitOpts { - service_path: event_worker_path, - no_module_cache, - import_map_path, - env_vars: std::env::vars().collect(), - conf: WorkerRuntimeOpts::EventsWorker, - }, - Some(EventWorkerRuntimeOpts { event_rx }), - ) +) -> Result, Error> { + let (events_tx, events_rx) = mpsc::unbounded_channel::(); + + let _ = create_worker(WorkerContextInitOpts { + service_path: events_worker_path, + no_module_cache, + import_map_path, + env_vars: std::env::vars().collect(), + events_rx: Some(events_rx), + conf: WorkerRuntimeOpts::EventsWorker(EventWorkerRuntimeOpts {}), + }) .await?; - Ok(event_tx) + Ok(events_tx) } pub async fn create_user_worker_pool( - worker_event_sender: Option>, + worker_event_sender: Option>, ) -> Result, Error> { let (user_worker_msgs_tx, mut user_worker_msgs_rx) = mpsc::unbounded_channel::(); @@ -376,30 +410,20 @@ pub async fn create_user_worker_pool( } } + user_worker_rt_opts.service_path = Some(service_path.to_string()); user_worker_rt_opts.key = Some(key); + user_worker_rt_opts.execution_id = Some(uuid::Uuid::new_v4()); user_worker_rt_opts.pool_msg_tx = Some(user_worker_msgs_tx_clone.clone()); user_worker_rt_opts.events_msg_tx = worker_event_sender.clone(); worker_options.conf = WorkerRuntimeOpts::UserWorker(user_worker_rt_opts); - let now = Instant::now(); - let result = create_worker(worker_options, None).await; - let elapsed = now.elapsed().as_secs(); - - let event_manager = worker_event_sender.clone(); + let result = create_worker(worker_options).await; match result { Ok(user_worker_req_tx) => { - send_event_if_event_manager_available( - event_manager.clone(), - WorkerEvents::Boot(BootEvent { - boot_time: elapsed as usize, - }), - ); - user_workers.insert( key, UserWorkerProfile { worker_event_tx: user_worker_req_tx, - event_manager_tx: event_manager, }, ); if tx.send(Ok(CreateUserWorkerResult { key })).is_err() { @@ -423,11 +447,9 @@ pub async fn create_user_worker_pool( let result = match req { Ok(rep) => Ok(rep), Err(err) => { - send_event_if_event_manager_available( - profile.event_manager_tx, - WorkerEvents::UncaughtException(UncaughtException { - exception: err.to_string(), - }), + error!( + "failed to send request to user worker: {}", + err.to_string() ); Err(err) } diff --git a/crates/base/tests/import_map_tests.rs b/crates/base/tests/import_map_tests.rs index 9ad4282e..b858c9fe 100644 --- a/crates/base/tests/import_map_tests.rs +++ b/crates/base/tests/import_map_tests.rs @@ -17,9 +17,10 @@ async fn test_import_map_file_path() { no_module_cache: false, import_map_path: Some("./test_cases/with_import_map/import_map.json".to_string()), env_vars: HashMap::new(), + events_rx: None, conf: WorkerRuntimeOpts::UserWorker(user_rt_opts), }; - let worker_req_tx = create_worker(opts, None).await.unwrap(); + let worker_req_tx = create_worker(opts).await.unwrap(); let (res_tx, res_rx) = oneshot::channel::, hyper::Error>>(); let req = Request::builder() @@ -64,9 +65,10 @@ async fn test_import_map_inline() { no_module_cache: false, import_map_path: Some(inline_import_map), env_vars: HashMap::new(), + events_rx: None, conf: WorkerRuntimeOpts::UserWorker(user_rt_opts), }; - let worker_req_tx = create_worker(opts, None).await.unwrap(); + let worker_req_tx = create_worker(opts).await.unwrap(); let (res_tx, res_rx) = oneshot::channel::, hyper::Error>>(); let req = Request::builder() diff --git a/crates/base/tests/main_worker_tests.rs b/crates/base/tests/main_worker_tests.rs index d39e488f..4ed51b9c 100644 --- a/crates/base/tests/main_worker_tests.rs +++ b/crates/base/tests/main_worker_tests.rs @@ -15,11 +15,12 @@ async fn test_main_worker_options_request() { no_module_cache: false, import_map_path: None, env_vars: HashMap::new(), + events_rx: None, conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx: user_worker_msgs_tx, }), }; - let worker_req_tx = create_worker(opts, None).await.unwrap(); + let worker_req_tx = create_worker(opts).await.unwrap(); let (res_tx, res_rx) = oneshot::channel::, hyper::Error>>(); let req = Request::builder() @@ -53,11 +54,12 @@ async fn test_main_worker_post_request() { no_module_cache: false, import_map_path: None, env_vars: HashMap::new(), + events_rx: None, conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx: user_worker_msgs_tx, }), }; - let worker_req_tx = create_worker(opts, None).await.unwrap(); + let worker_req_tx = create_worker(opts).await.unwrap(); let (res_tx, res_rx) = oneshot::channel::, hyper::Error>>(); let body_chunk = "{ \"name\": \"bar\"}"; @@ -95,11 +97,12 @@ async fn test_main_worker_boot_error() { no_module_cache: false, import_map_path: Some("./non-existing-import-map.json".to_string()), env_vars: HashMap::new(), + events_rx: None, conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx: user_worker_msgs_tx, }), }; - let result = create_worker(opts, None).await; + let result = create_worker(opts).await; assert!(result.is_err()); assert_eq!(result.unwrap_err().to_string(), "worker boot error"); diff --git a/crates/base/tests/null_body_status_null_body_tests.rs b/crates/base/tests/null_body_status_null_body_tests.rs index 494b104a..6dcea9a8 100644 --- a/crates/base/tests/null_body_status_null_body_tests.rs +++ b/crates/base/tests/null_body_status_null_body_tests.rs @@ -14,9 +14,10 @@ async fn test_null_body_with_204_status() { no_module_cache: false, import_map_path: None, env_vars: HashMap::new(), + events_rx: None, conf: WorkerRuntimeOpts::UserWorker(user_rt_opts), }; - let worker_req_tx = create_worker(opts, None).await.unwrap(); + let worker_req_tx = create_worker(opts).await.unwrap(); let (res_tx, res_rx) = oneshot::channel::, hyper::Error>>(); let req = Request::builder() @@ -47,9 +48,10 @@ async fn test_null_body_with_204_status_post() { no_module_cache: false, import_map_path: None, env_vars: HashMap::new(), + events_rx: None, conf: WorkerRuntimeOpts::UserWorker(user_rt_opts), }; - let worker_req_tx = create_worker(opts, None).await.unwrap(); + let worker_req_tx = create_worker(opts).await.unwrap(); let (res_tx, res_rx) = oneshot::channel::, hyper::Error>>(); let req = Request::builder() diff --git a/crates/base/tests/oak_user_worker_tests.rs b/crates/base/tests/oak_user_worker_tests.rs index f8e0a825..e2df32b0 100644 --- a/crates/base/tests/oak_user_worker_tests.rs +++ b/crates/base/tests/oak_user_worker_tests.rs @@ -17,9 +17,10 @@ async fn test_oak_server() { no_module_cache: false, import_map_path: None, env_vars: HashMap::new(), + events_rx: None, conf: WorkerRuntimeOpts::UserWorker(user_rt_opts), }; - let worker_req_tx = create_worker(opts, None).await.unwrap(); + let worker_req_tx = create_worker(opts).await.unwrap(); let (res_tx, res_rx) = oneshot::channel::, hyper::Error>>(); let req = Request::builder() @@ -50,9 +51,10 @@ async fn test_file_upload() { no_module_cache: false, import_map_path: None, env_vars: HashMap::new(), + events_rx: None, conf: WorkerRuntimeOpts::UserWorker(user_rt_opts), }; - let worker_req_tx = create_worker(opts, None).await.unwrap(); + let worker_req_tx = create_worker(opts).await.unwrap(); let (res_tx, res_rx) = oneshot::channel::, hyper::Error>>(); let body_chunk = "--TEST\r\nContent-Disposition: form-data; name=\"file\"; filename=\"test.txt\"\r\nContent-Type: text/plain\r\n\r\ntestuser\r\n--TEST--\r\n"; diff --git a/crates/base/tests/tls_invalid_data_tests.rs b/crates/base/tests/tls_invalid_data_tests.rs index e36619d1..2a00b293 100644 --- a/crates/base/tests/tls_invalid_data_tests.rs +++ b/crates/base/tests/tls_invalid_data_tests.rs @@ -14,9 +14,10 @@ async fn test_tls_throw_invalid_data() { no_module_cache: false, import_map_path: None, env_vars: HashMap::new(), + events_rx: None, conf: WorkerRuntimeOpts::UserWorker(user_rt_opts), }; - let worker_req_tx = create_worker(opts, None).await.unwrap(); + let worker_req_tx = create_worker(opts).await.unwrap(); let (res_tx, res_rx) = oneshot::channel::, hyper::Error>>(); let req = Request::builder() diff --git a/crates/base/tests/user_worker_tests.rs b/crates/base/tests/user_worker_tests.rs index 0516169e..977ca94c 100644 --- a/crates/base/tests/user_worker_tests.rs +++ b/crates/base/tests/user_worker_tests.rs @@ -14,9 +14,10 @@ async fn test_user_worker_json_imports() { no_module_cache: false, import_map_path: None, env_vars: HashMap::new(), + events_rx: None, conf: WorkerRuntimeOpts::UserWorker(user_rt_opts), }; - let worker_req_tx = create_worker(opts, None).await.unwrap(); + let worker_req_tx = create_worker(opts).await.unwrap(); let (res_tx, res_rx) = oneshot::channel::, hyper::Error>>(); let req = Request::builder() diff --git a/crates/base/tests/worker_boot_tests.rs b/crates/base/tests/worker_boot_tests.rs index 5e278013..8ed791ba 100644 --- a/crates/base/tests/worker_boot_tests.rs +++ b/crates/base/tests/worker_boot_tests.rs @@ -13,9 +13,10 @@ async fn test_worker_boot_invalid_imports() { no_module_cache: false, import_map_path: None, env_vars: HashMap::new(), + events_rx: None, conf: WorkerRuntimeOpts::UserWorker(user_rt_opts), }; - let result = create_worker(opts, None).await; + let result = create_worker(opts).await; assert!(result.is_err()); assert_eq!(result.unwrap_err().to_string(), "worker boot error"); diff --git a/crates/sb_worker_context/Cargo.toml b/crates/sb_worker_context/Cargo.toml index 95ba07ea..10739a0c 100644 --- a/crates/sb_worker_context/Cargo.toml +++ b/crates/sb_worker_context/Cargo.toml @@ -11,9 +11,9 @@ license = "MIT" path = "lib.rs" [dependencies] +anyhow = { workspace = true } +enum-as-inner = "0.6.0" hyper.workspace = true +serde.workspace = true tokio.workspace = true uuid.workspace = true -serde.workspace = true -anyhow = { workspace = true } -enum-as-inner = "0.6.0" diff --git a/crates/sb_worker_context/essentials.rs b/crates/sb_worker_context/essentials.rs index 78ac392b..d07a6755 100644 --- a/crates/sb_worker_context/essentials.rs +++ b/crates/sb_worker_context/essentials.rs @@ -1,17 +1,20 @@ -use crate::events::WorkerEvents; +use crate::events::WorkerEventWithMetadata; use anyhow::Error; use enum_as_inner::EnumAsInner; use hyper::{Body, Request, Response}; use std::collections::HashMap; use std::path::PathBuf; use tokio::sync::{mpsc, oneshot}; +use uuid::Uuid; #[derive(Debug, Clone)] pub struct UserWorkerRuntimeOpts { - pub key: Option, + pub service_path: Option, + pub key: Option, // unique worker key based on service path hash + pub execution_id: Option, pub pool_msg_tx: Option>, - pub events_msg_tx: Option>, + pub events_msg_tx: Option>, pub memory_limit_mb: u64, pub low_memory_multiplier: u64, @@ -43,6 +46,8 @@ impl Default for UserWorkerRuntimeOpts { events_msg_tx: None, allow_remote_modules: true, custom_module_root: None, + service_path: None, + execution_id: None, } } } @@ -52,24 +57,23 @@ pub struct MainWorkerRuntimeOpts { pub worker_pool_tx: mpsc::UnboundedSender, } -#[derive(Debug)] -pub struct EventWorkerRuntimeOpts { - pub event_rx: mpsc::UnboundedReceiver, -} +#[derive(Debug, Clone)] +pub struct EventWorkerRuntimeOpts {} #[derive(Debug, Clone, EnumAsInner)] pub enum WorkerRuntimeOpts { UserWorker(UserWorkerRuntimeOpts), MainWorker(MainWorkerRuntimeOpts), - EventsWorker, + EventsWorker(EventWorkerRuntimeOpts), } -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct WorkerContextInitOpts { pub service_path: PathBuf, pub no_module_cache: bool, pub import_map_path: Option, pub env_vars: HashMap, + pub events_rx: Option>, pub conf: WorkerRuntimeOpts, } diff --git a/crates/sb_worker_context/events.rs b/crates/sb_worker_context/events.rs index e4fe75e0..adbc9b67 100644 --- a/crates/sb_worker_context/events.rs +++ b/crates/sb_worker_context/events.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Serialize}; +use uuid::Uuid; #[derive(Serialize, Deserialize, Debug)] pub struct PseudoEvent {} @@ -18,6 +19,12 @@ pub struct UncaughtException { pub exception: String, } +#[derive(Serialize, Deserialize, Debug)] +pub struct LogEvent { + pub msg: String, + pub level: LogLevel, +} + #[derive(Serialize, Deserialize, Debug)] pub enum LogLevel { Debug, @@ -26,12 +33,6 @@ pub enum LogLevel { Error, } -#[derive(Serialize, Deserialize, Debug)] -pub struct LogEvent { - pub msg: String, - pub level: LogLevel, -} - #[derive(Serialize, Deserialize, Debug)] pub enum WorkerEvents { Boot(BootEvent), @@ -43,3 +44,15 @@ pub enum WorkerEvents { EventLoopCompleted(PseudoEvent), Log(LogEvent), } + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct EventMetadata { + pub service_path: Option, + pub execution_id: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct WorkerEventWithMetadata { + pub event: WorkerEvents, + pub metadata: EventMetadata, +} diff --git a/crates/sb_workers/event_worker.js b/crates/sb_workers/event_worker.js index 50753042..54200637 100644 --- a/crates/sb_workers/event_worker.js +++ b/crates/sb_workers/event_worker.js @@ -11,11 +11,12 @@ class SupabaseEventListener { let value = undefined; if (!done) { const rawEvent = reqEvt['Event']; - const eventType = Object.keys(rawEvent)[0]; + const eventType = Object.keys(rawEvent.event)[0]; value = { timestamp: new Date().toISOString(), event_type: eventType, - event: rawEvent[eventType], + event: rawEvent.event[eventType], + metadata: rawEvent.metadata, }; } diff --git a/crates/sb_workers/events.rs b/crates/sb_workers/events.rs index e36f488d..25f07e26 100644 --- a/crates/sb_workers/events.rs +++ b/crates/sb_workers/events.rs @@ -1,7 +1,7 @@ use anyhow::{bail, Error}; use deno_core::op; use deno_core::OpState; -use sb_worker_context::events::WorkerEvents; +use sb_worker_context::events::WorkerEventWithMetadata; use serde::{Deserialize, Serialize}; use std::cell::RefCell; use std::rc::Rc; @@ -9,7 +9,7 @@ use tokio::sync::mpsc; #[derive(Serialize, Deserialize)] pub enum RawEvent { - Event(WorkerEvents), + Event(WorkerEventWithMetadata), Done, } @@ -25,7 +25,7 @@ pub struct IncomingEvent { async fn op_event_accept(state: Rc>) -> Result { let rx = { let mut op_state = state.borrow_mut(); - op_state.try_take::>() + op_state.try_take::>() }; if rx.is_none() { bail!("events worker receiver not available") @@ -35,7 +35,7 @@ async fn op_event_accept(state: Rc>) -> Result let data = rx.recv().await; let mut op_state = state.borrow_mut(); - op_state.put::>(rx); + op_state.put::>(rx); match data { Some(event) => Ok(RawEvent::Event(event)), diff --git a/crates/sb_workers/lib.rs b/crates/sb_workers/lib.rs index 426402b2..99eff2c2 100644 --- a/crates/sb_workers/lib.rs +++ b/crates/sb_workers/lib.rs @@ -94,6 +94,7 @@ pub async fn op_user_worker_create( no_module_cache, import_map_path, env_vars: env_vars_map, + events_rx: None, conf: WorkerRuntimeOpts::UserWorker(UserWorkerRuntimeOpts { memory_limit_mb, low_memory_multiplier, @@ -107,6 +108,8 @@ pub async fn op_user_worker_create( key: None, pool_msg_tx: None, events_msg_tx: None, + service_path: None, + execution_id: None, }), }; diff --git a/examples/event-manager/index.ts b/examples/event-manager/index.ts index e6b3b74b..36820500 100644 --- a/examples/event-manager/index.ts +++ b/examples/event-manager/index.ts @@ -1,5 +1,7 @@ const eventManager = new globalThis.EventManager(); +console.log('event manager running'); + for await (const data of eventManager) { if (data) { switch (data.event_type) {