Skip to content

Commit

Permalink
fix: add metadata to events
Browse files Browse the repository at this point in the history
  • Loading branch information
laktek committed Jul 10, 2023
1 parent 87aada8 commit a77fa85
Show file tree
Hide file tree
Showing 20 changed files with 184 additions and 148 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ sb_env = { version = "0.1.0", path = "../sb_env" }
sb_core = { version = "0.1.0", path = "../sb_core" }
sb_os = { version = "0.1.0", path = "../sb_os" }
urlencoding = { version = "2.1.2" }
uuid = { workspace = true }

[dev-dependencies]
futures-util = { version = "0.3.28" }
Expand Down
70 changes: 24 additions & 46 deletions crates/base/src/deno_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ use sb_core::permissions::{sb_core_permissions, Permissions};
use sb_core::runtime::sb_core_runtime;
use sb_core::sb_core_main_js;
use sb_env::sb_env as sb_env_op;
use sb_worker_context::essentials::{
EventWorkerRuntimeOpts, UserWorkerMsgs, WorkerContextInitOpts, WorkerRuntimeOpts,
};
use sb_worker_context::events::WorkerEvents;
use sb_worker_context::essentials::{UserWorkerMsgs, WorkerContextInitOpts, WorkerRuntimeOpts};
use sb_worker_context::events::WorkerEventWithMetadata;
use sb_workers::events::sb_user_event_worker;
use sb_workers::sb_user_workers;

Expand Down Expand Up @@ -115,26 +113,16 @@ pub struct DenoRuntime {
}

impl DenoRuntime {
pub async fn new(
opts: WorkerContextInitOpts,
event_manager_opts: Option<EventWorkerRuntimeOpts>, // TODO: refactor this be part of opts
) -> Result<Self, Error> {
let mut maybe_events_msg_tx: Option<mpsc::UnboundedSender<WorkerEvents>> = None;

pub async fn new(opts: WorkerContextInitOpts) -> Result<Self, Error> {
let WorkerContextInitOpts {
service_path,
no_module_cache,
import_map_path,
env_vars,
events_rx,
conf,
} = opts;

if conf.is_user_worker() {
if let Some(events_msg_tx) = conf.as_user_worker().unwrap().events_msg_tx.clone() {
maybe_events_msg_tx = Some(events_msg_tx)
}
}

set_v8_flags();

let user_agent = "supabase-edge-runtime".to_string();
Expand Down Expand Up @@ -237,17 +225,14 @@ impl DenoRuntime {
op_state.put::<sb_env::EnvVars>(env_vars);

if conf.is_events_worker() {
if let WorkerRuntimeOpts::EventsWorker = conf.clone() {
// We unwrap because event_manager_opts must always be present when type is `EventsWorker`
op_state.put::<mpsc::UnboundedReceiver<WorkerEvents>>(
event_manager_opts.unwrap().event_rx,
);
}
// if worker is an events worker, assert events_rx is to be available
op_state
.put::<mpsc::UnboundedReceiver<WorkerEventWithMetadata>>(events_rx.unwrap());
}

if conf.is_user_worker() {
if let Some(events_msg_tx) = maybe_events_msg_tx.clone() {
op_state.put::<mpsc::UnboundedSender<WorkerEvents>>(events_msg_tx);
if let Some(events_msg_tx) = conf.as_user_worker().unwrap().events_msg_tx.clone() {
op_state.put::<mpsc::UnboundedSender<WorkerEventWithMetadata>>(events_msg_tx);
}
}
}
Expand All @@ -262,13 +247,6 @@ impl DenoRuntime {
})
}

pub fn event_sender(&self) -> Option<mpsc::UnboundedSender<WorkerEvents>> {
self.conf
.as_user_worker()
.map(|c| c.events_msg_tx.clone())
.unwrap()
}

pub async fn run(
mut self,
unix_stream_rx: mpsc::UnboundedReceiver<UnixStream>,
Expand Down Expand Up @@ -353,22 +331,20 @@ mod test {
) -> DenoRuntime {
let (worker_pool_tx, _) = mpsc::unbounded_channel::<UserWorkerMsgs>();

DenoRuntime::new(
WorkerContextInitOpts {
service_path: path.unwrap_or(PathBuf::from("./test_cases/main")),
no_module_cache: false,
import_map_path: None,
env_vars: env_vars.unwrap_or(Default::default()),
conf: {
if let Some(uc) = user_conf {
uc
} else {
WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx })
}
},
DenoRuntime::new(WorkerContextInitOpts {
service_path: path.unwrap_or(PathBuf::from("./test_cases/main")),
no_module_cache: false,
import_map_path: None,
env_vars: env_vars.unwrap_or(Default::default()),
events_rx: None,
conf: {
if let Some(uc) = user_conf {
uc
} else {
WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { worker_pool_tx })
}
},
None,
)
})
.await
.unwrap()
}
Expand Down Expand Up @@ -624,6 +600,8 @@ mod test {
key: None,
pool_msg_tx: None,
events_msg_tx: None,
execution_id: None,
service_path: None,
})),
)
.await
Expand Down
36 changes: 17 additions & 19 deletions crates/base/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::worker_ctx::{
create_event_worker, create_user_worker_pool, create_worker, WorkerRequestMsg,
create_events_worker, create_user_worker_pool, create_worker, WorkerRequestMsg,
};
use anyhow::Error;
use hyper::{server::conn::Http, service::Service, Body, Request, Response};
use log::{debug, error, info};
use sb_worker_context::essentials::{
MainWorkerRuntimeOpts, WorkerContextInitOpts, WorkerRuntimeOpts,
};
use sb_worker_context::events::WorkerEvents;
use sb_worker_context::events::WorkerEventWithMetadata;
use std::future::Future;
use std::net::IpAddr;
use std::net::Ipv4Addr;
Expand Down Expand Up @@ -92,38 +92,36 @@ impl Server {
no_module_cache: bool,
callback_tx: Option<Sender<ServerCodes>>,
) -> Result<Self, Error> {
let mut worker_event_sender: Option<mpsc::UnboundedSender<WorkerEvents>> = None;
let mut worker_events_sender: Option<mpsc::UnboundedSender<WorkerEventWithMetadata>> = None;

// Create Event Worker
if let Some(events_service_path) = maybe_events_service_path {
let events_path = Path::new(&events_service_path);
let events_path_buf = events_path.to_path_buf();

let event_worker =
create_event_worker(events_path_buf, import_map_path.clone(), no_module_cache)
let events_worker =
create_events_worker(events_path_buf, import_map_path.clone(), no_module_cache)
.await
.expect("Event worker could not be created");

worker_event_sender = Some(event_worker);
worker_events_sender = Some(events_worker);
}

// Create a user worker pool
let user_worker_msgs_tx = create_user_worker_pool(worker_event_sender).await?;
let user_worker_msgs_tx = create_user_worker_pool(worker_events_sender).await?;

// create main worker
let main_path = Path::new(&main_service_path);
let main_worker_req_tx = create_worker(
WorkerContextInitOpts {
service_path: main_path.to_path_buf(),
import_map_path,
no_module_cache,
conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
worker_pool_tx: user_worker_msgs_tx,
}),
env_vars: std::env::vars().collect(),
},
None,
)
let main_worker_req_tx = create_worker(WorkerContextInitOpts {
service_path: main_path.to_path_buf(),
import_map_path,
no_module_cache,
events_rx: None,
conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
worker_pool_tx: user_worker_msgs_tx,
}),
env_vars: std::env::vars().collect(),
})
.await?;

// register alarm signal handler
Expand Down
7 changes: 4 additions & 3 deletions crates/base/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use sb_worker_context::events::WorkerEvents;
use sb_worker_context::events::{EventMetadata, WorkerEventWithMetadata, WorkerEvents};
use tokio::sync::mpsc;

pub mod units;

pub fn send_event_if_event_manager_available(
maybe_event_manager: Option<mpsc::UnboundedSender<WorkerEvents>>,
maybe_event_manager: Option<mpsc::UnboundedSender<WorkerEventWithMetadata>>,
event: WorkerEvents,
metadata: EventMetadata,
) {
if let Some(event_manager) = maybe_event_manager {
let _ = event_manager.send(event);
let _ = event_manager.send(WorkerEventWithMetadata { event, metadata });
}
}
Loading

0 comments on commit a77fa85

Please sign in to comment.