diff --git a/crates/daemon/src/main.rs b/crates/daemon/src/main.rs index c06e1ab1..ec0f5356 100644 --- a/crates/daemon/src/main.rs +++ b/crates/daemon/src/main.rs @@ -32,7 +32,7 @@ use moor_kernel::config::Config; use moor_kernel::tasks::scheduler::Scheduler; use moor_kernel::textdump::textdump_load; -use crate::rpc_server::zmq_loop; +use crate::rpc_server::RpcServer; #[cfg(feature = "relbox")] use moor_db_relbox::RelBoxDatabaseBuilder; @@ -302,10 +302,26 @@ fn main() -> Result<(), Report> { let scheduler = Scheduler::new(database, tasks_db, config); let scheduler_client = scheduler.client().expect("Failed to get scheduler client"); + // We have to create the RpcServer before starting the scheduler because we need to pass it in + // as a parameter to the scheduler for background session construction. + + let zmq_ctx = zmq::Context::new(); + zmq_ctx + .set_io_threads(args.num_io_threads) + .expect("Failed to set number of IO threads"); + let rpc_server = Arc::new(RpcServer::new( + keypair, + args.connections_file, + zmq_ctx.clone(), + args.narrative_listen.as_str(), + args.db_flavour, + )); + // The scheduler thread: + let scheduler_rpc_server = rpc_server.clone(); let scheduler_loop_jh = std::thread::Builder::new() .name("moor-scheduler".to_string()) - .spawn(move || scheduler.run())?; + .spawn(move || scheduler.run(scheduler_rpc_server))?; let kill_switch = Arc::new(std::sync::atomic::AtomicBool::new(false)); @@ -327,22 +343,15 @@ fn main() -> Result<(), Report> { })?; let rpc_kill_switch = kill_switch.clone(); + + let rpc_loop_scheduler_client = scheduler_client.clone(); let rpc_listen = args.rpc_listen.clone(); - let rpc_narrative_listen = args.narrative_listen.clone(); - let rpc_scheduler_client = scheduler_client.clone(); let rpc_loop_thread = std::thread::Builder::new() .name("moor-rpc".to_string()) .spawn(move || { - let _ = zmq_loop( - keypair, - args.connections_file, - rpc_scheduler_client, - rpc_listen, - rpc_narrative_listen, - Some(args.num_io_threads), - rpc_kill_switch, - args.db_flavour, - ); + rpc_server + .zmq_loop(rpc_listen, rpc_loop_scheduler_client, rpc_kill_switch) + .expect("RPC thread failed"); })?; signal_hook::flag::register(signal_hook::consts::SIGTERM, kill_switch.clone())?; diff --git a/crates/daemon/src/rpc_server.rs b/crates/daemon/src/rpc_server.rs index 84a1c043..580c1b95 100644 --- a/crates/daemon/src/rpc_server.rs +++ b/crates/daemon/src/rpc_server.rs @@ -35,7 +35,7 @@ use zmq::{Socket, SocketType}; use moor_kernel::tasks::scheduler::SchedulerError::CommandExecutionError; use moor_kernel::tasks::scheduler::{SchedulerError, TaskResult}; use moor_kernel::tasks::sessions::SessionError::DeliveryError; -use moor_kernel::tasks::sessions::{Session, SessionError}; +use moor_kernel::tasks::sessions::{Session, SessionError, SessionFactory}; use moor_kernel::tasks::TaskHandle; use moor_values::model::{CommandError, NarrativeEvent}; use moor_values::util::parse_into_words; @@ -60,9 +60,9 @@ use crate::rpc_session::RpcSession; use crate::connections_rb::ConnectionsRb; pub struct RpcServer { + zmq_context: zmq::Context, keypair: Key<64>, publish: Arc>, - scheduler: SchedulerClient, connections: Arc, } @@ -73,13 +73,13 @@ pub(crate) fn make_response(result: Result) -> Vec }; bincode::encode_to_vec(&rpc_result, bincode::config::standard()).unwrap() } + impl RpcServer { pub fn new( keypair: Key<64>, connections_db_path: PathBuf, zmq_context: zmq::Context, narrative_endpoint: &str, - scheduler: SchedulerClient, // For determining the flavor for the connections database. db_flavor: DatabaseFlavour, ) -> Self { @@ -104,14 +104,120 @@ impl RpcServer { ); Self { keypair, - scheduler, connections, publish: Arc::new(Mutex::new(publish)), + zmq_context, + } + } + + pub(crate) fn zmq_loop( + self: Arc, + rpc_endpoint: String, + scheduler_client: SchedulerClient, + kill_switch: Arc, + ) -> eyre::Result<()> { + // Start up the ping-ponger timer in a background thread... + let t_rpc_server = self.clone(); + std::thread::Builder::new() + .name("rpc-ping-pong".to_string()) + .spawn(move || loop { + std::thread::sleep(std::time::Duration::from_secs(5)); + t_rpc_server.ping_pong().expect("Unable to play ping-pong"); + })?; + + // We need to bind a generic publisher to the narrative endpoint, so that subsequent sessions + // are visible... + let rpc_socket = self.zmq_context.socket(zmq::REP)?; + rpc_socket.bind(&rpc_endpoint)?; + + info!( + "0mq server listening on {} with {} IO threads", + rpc_endpoint, + self.zmq_context.get_io_threads().unwrap() + ); + + let this = self.clone(); + loop { + if kill_switch.load(Ordering::Relaxed) { + info!("Kill switch activated, exiting"); + return Ok(()); + } + let poll_result = rpc_socket + .poll(zmq::POLLIN, 100) + .with_context(|| "Error polling ZMQ socket. Bailing out.")?; + if poll_result == 0 { + continue; + } + match rpc_socket.recv_multipart(0) { + Err(_) => { + info!("ZMQ socket closed, exiting"); + return Ok(()); + } + Ok(request) => { + trace!(num_parts = request.len(), "ZQM Request received"); + + // Components are: + if request.len() != 2 { + error!("Invalid request received, ignoring"); + + rpc_socket.send_multipart( + vec![make_response(Err(RpcRequestError::InvalidRequest))], + 0, + )?; + continue; + } + + if request.len() != 2 { + rpc_socket.send_multipart( + vec![make_response(Err(RpcRequestError::InvalidRequest))], + 0, + )?; + continue; + } + + let (client_id, request_body) = (&request[0], &request[1]); + + let Ok(client_id) = Uuid::from_slice(client_id) else { + rpc_socket.send_multipart( + vec![make_response(Err(RpcRequestError::InvalidRequest))], + 0, + )?; + continue; + }; + + // Decode 'request_body' as a bincode'd ClientEvent. + let request = + match bincode::decode_from_slice(request_body, bincode::config::standard()) + { + Ok((request, _)) => request, + Err(_) => { + rpc_socket.send_multipart( + vec![make_response(Err(RpcRequestError::InvalidRequest))], + 0, + )?; + + continue; + } + }; + + // The remainder of the payload are all the request arguments, which vary depending + // on the type. + let response = + this.clone() + .process_request(scheduler_client.clone(), client_id, request); + rpc_socket.send_multipart(vec![response], 0)?; + } + } } } /// Process a request (originally ZMQ REQ) and produce a reply (becomes ZMQ REP) - pub fn process_request(self: Arc, client_id: Uuid, request: RpcRequest) -> Vec { + pub fn process_request( + self: Arc, + scheduler_client: SchedulerClient, + client_id: Uuid, + request: RpcRequest, + ) -> Vec { match request { RpcRequest::ConnectionEstablish(hostname) => { match self.connections.new_connection(client_id, hostname, None) { @@ -139,10 +245,12 @@ impl RpcServer { if let Some(connect_type) = connect_type { trace!(?player, "Submitting user_connected task"); - if let Err(e) = - self.clone() - .submit_connected_task(client_id, player, connect_type) - { + if let Err(e) = self.clone().submit_connected_task( + scheduler_client, + client_id, + player, + connect_type, + ) { error!(error = ?e, "Error submitting user_connected task"); // Note we still continue to return a successful login result here, hoping for the best @@ -193,7 +301,12 @@ impl RpcServer { return make_response(Err(RpcRequestError::PermissionDenied)); }; - make_response(self.clone().request_sys_prop(connection, object, property)) + make_response(self.clone().request_sys_prop( + scheduler_client, + connection, + object, + property, + )) } RpcRequest::LoginCommand(token, args, attach) => { let Some(connection) = self.connections.connection_object_for_client(client_id) @@ -209,10 +322,13 @@ impl RpcServer { return make_response(Err(RpcRequestError::PermissionDenied)); }; - make_response( - self.clone() - .perform_login(client_id, connection, args, attach), - ) + make_response(self.clone().perform_login( + scheduler_client, + client_id, + connection, + args, + attach, + )) } RpcRequest::Command(token, auth_token, command) => { let Some(connection) = self.connections.connection_object_for_client(client_id) @@ -237,7 +353,12 @@ impl RpcServer { ); return make_response(Err(RpcRequestError::PermissionDenied)); }; - make_response(self.clone().perform_command(client_id, connection, command)) + make_response(self.clone().perform_command( + scheduler_client, + client_id, + connection, + command, + )) } RpcRequest::RequestedInput(token, auth_token, request_id, input) => { let Some(connection) = self.connections.connection_object_for_client(client_id) @@ -263,10 +384,13 @@ impl RpcServer { return make_response(Err(RpcRequestError::PermissionDenied)); }; let request_id = Uuid::from_u128(request_id); - make_response( - self.clone() - .respond_input(client_id, connection, request_id, input), - ) + make_response(self.clone().respond_input( + scheduler_client, + client_id, + connection, + request_id, + input, + )) } RpcRequest::OutOfBand(token, auth_token, command) => { let Some(connection) = self.connections.connection_object_for_client(client_id) @@ -291,10 +415,12 @@ impl RpcServer { return make_response(Err(RpcRequestError::PermissionDenied)); }; - make_response( - self.clone() - .perform_out_of_band(client_id, connection, command), - ) + make_response(self.clone().perform_out_of_band( + scheduler_client, + client_id, + connection, + command, + )) } RpcRequest::Eval(token, auth_token, evalstr) => { @@ -320,7 +446,10 @@ impl RpcServer { ); return make_response(Err(RpcRequestError::PermissionDenied)); }; - make_response(self.clone().eval(client_id, connection, evalstr)) + make_response( + self.clone() + .eval(scheduler_client, client_id, connection, evalstr), + ) } RpcRequest::Detach(token) => { let Ok(_) = self.validate_client_token(token, client_id) else { @@ -363,10 +492,14 @@ impl RpcServer { return make_response(Err(RpcRequestError::PermissionDenied)); }; - make_response( - self.clone() - .program_verb(client_id, connection, object, verb, code), - ) + make_response(self.clone().program_verb( + scheduler_client, + client_id, + connection, + object, + verb, + code, + )) } } } @@ -437,6 +570,7 @@ impl RpcServer { fn request_sys_prop( self: Arc, + scheduler_client: SchedulerClient, player: Objid, object: String, property: String, @@ -444,10 +578,7 @@ impl RpcServer { let object = Symbol::mk_case_insensitive(object.as_str()); let property = Symbol::mk_case_insensitive(property.as_str()); - let pv = match self - .scheduler - .request_system_property(player, object, property) - { + let pv = match scheduler_client.request_system_property(player, object, property) { Ok(pv) => pv, Err(CommandExecutionError(CommandError::NoObjectMatch)) => { return Ok(RpcResponse::SysPropValue(None)); @@ -465,6 +596,7 @@ impl RpcServer { fn perform_login( self: Arc, + scheduler_client: SchedulerClient, client_id: Uuid, connection: Objid, args: Vec, @@ -485,7 +617,7 @@ impl RpcServer { let Ok(session) = self.clone().new_session(client_id, connection) else { return Err(RpcRequestError::CreateSessionFailed); }; - let task_handle = match self.clone().scheduler.submit_verb_task( + let task_handle = match scheduler_client.submit_verb_task( connection, SYSTEM_OBJECT, "do_login_command".to_string(), @@ -544,10 +676,12 @@ impl RpcServer { if attach { trace!(?player, "Submitting user_connected task"); - if let Err(e) = self - .clone() - .submit_connected_task(client_id, player, connect_type) - { + if let Err(e) = self.clone().submit_connected_task( + scheduler_client, + client_id, + player, + connect_type, + ) { error!(error = ?e, "Error submitting user_connected task"); // Note we still continue to return a successful login result here, hoping for the best @@ -562,6 +696,7 @@ impl RpcServer { fn submit_connected_task( self: Arc, + scheduler_client: SchedulerClient, client_id: Uuid, player: Objid, initiation_type: ConnectType, @@ -576,7 +711,7 @@ impl RpcServer { ConnectType::Reconnected => "user_reconnected".to_string(), ConnectType::Created => "user_created".to_string(), }; - self.scheduler + scheduler_client .submit_verb_task( player, SYSTEM_OBJECT, @@ -592,6 +727,7 @@ impl RpcServer { fn perform_command( self: Arc, + scheduler_client: SchedulerClient, client_id: Uuid, connection: Objid, command: String, @@ -612,7 +748,7 @@ impl RpcServer { let arguments = parse_into_words(command.as_str()); - if let Ok(do_command_task_handle) = self.clone().scheduler.submit_verb_task( + if let Ok(do_command_task_handle) = scheduler_client.submit_verb_task( connection, SYSTEM_OBJECT, "do_command".to_string(), @@ -638,11 +774,7 @@ impl RpcServer { "Invoking submit_command_task" ); let parse_command_task_handle = - match self - .clone() - .scheduler - .submit_command_task(connection, command.as_str(), session) - { + match scheduler_client.submit_command_task(connection, command.as_str(), session) { Ok(t) => t, Err(SchedulerError::CommandExecutionError(e)) => { return Err(RpcRequestError::CommandError(e)); @@ -660,6 +792,7 @@ impl RpcServer { fn respond_input( self: Arc, + scheduler_client: SchedulerClient, client_id: Uuid, connection: Objid, input_request_id: Uuid, @@ -673,10 +806,7 @@ impl RpcServer { }; // Pass this back over to the scheduler to handle. - if let Err(e) = - self.clone() - .scheduler - .submit_requested_input(connection, input_request_id, input) + if let Err(e) = scheduler_client.submit_requested_input(connection, input_request_id, input) { error!(error = ?e, "Error submitting requested input"); return Err(RpcRequestError::InternalError(e.to_string())); @@ -707,6 +837,7 @@ impl RpcServer { /// Call $do_out_of_band(command) fn perform_out_of_band( self: Arc, + scheduler_client: SchedulerClient, client_id: Uuid, connection: Objid, command: String, @@ -716,7 +847,7 @@ impl RpcServer { }; let command_components = parse_into_words(command.as_str()); - let task_handle = match self.clone().scheduler.submit_out_of_band_task( + let task_handle = match scheduler_client.submit_out_of_band_task( connection, command_components, command, @@ -738,6 +869,7 @@ impl RpcServer { fn eval( self: Arc, + scheduler_client: SchedulerClient, client_id: Uuid, connection: Objid, expression: String, @@ -746,17 +878,14 @@ impl RpcServer { return Err(RpcRequestError::CreateSessionFailed); }; - let task_handle = match self - .clone() - .scheduler - .submit_eval_task(connection, connection, expression, session) - { - Ok(t) => t, - Err(e) => { - error!(error = ?e, "Error submitting eval task"); - return Err(RpcRequestError::InternalError(e.to_string())); - } - }; + let task_handle = + match scheduler_client.submit_eval_task(connection, connection, expression, session) { + Ok(t) => t, + Err(e) => { + error!(error = ?e, "Error submitting eval task"); + return Err(RpcRequestError::InternalError(e.to_string())); + } + }; match task_handle.into_receiver().recv() { Ok(TaskResult::Success(v)) => Ok(RpcResponse::EvalResult(v)), Ok(TaskResult::Error(SchedulerError::CommandExecutionError(e))) => { @@ -777,6 +906,7 @@ impl RpcServer { fn program_verb( self: Arc, + scheduler_client: SchedulerClient, client_id: Uuid, connection: Objid, object: String, @@ -788,11 +918,7 @@ impl RpcServer { }; let verb = Symbol::mk_case_insensitive(verb.as_str()); - match self - .clone() - .scheduler - .submit_verb_program(connection, connection, object, verb, code) - { + match scheduler_client.submit_verb_program(connection, connection, object, verb, code) { Ok((obj, verb)) => Ok(RpcResponse::ProgramSuccess(obj, verb.to_string())), Err(SchedulerError::VerbProgramFailed(e)) => Err(RpcRequestError::VerbProgramFailed(e)), Err(e) => { @@ -1043,118 +1169,11 @@ impl RpcServer { } } -#[allow(clippy::too_many_arguments)] -pub(crate) fn zmq_loop( - keypair: Key<64>, - connections_db_path: PathBuf, - scheduler_client: SchedulerClient, - rpc_endpoint: String, - narrative_endpoint: String, - num_threads: Option, - kill_switch: Arc, - db_flavour: DatabaseFlavour, -) -> eyre::Result<()> { - let zmq_ctx = zmq::Context::new(); - if let Some(num_threads) = num_threads { - zmq_ctx.set_io_threads(num_threads)?; - } - - let rpc_server = Arc::new(RpcServer::new( - keypair, - connections_db_path, - zmq_ctx.clone(), - &narrative_endpoint, - scheduler_client, - db_flavour, - )); - - // Start up the ping-ponger timer in a background thread... - let t_rpc_server = rpc_server.clone(); - std::thread::Builder::new() - .name("rpc-ping-pong".to_string()) - .spawn(move || loop { - std::thread::sleep(std::time::Duration::from_secs(5)); - t_rpc_server.ping_pong().expect("Unable to play ping-pong"); - })?; - - // We need to bind a generic publisher to the narrative endpoint, so that subsequent sessions - // are visible... - let rpc_socket = zmq_ctx.socket(zmq::REP)?; - rpc_socket.bind(&rpc_endpoint)?; - - info!( - "0mq server listening on {} with {} IO threads", - rpc_endpoint, - zmq_ctx.get_io_threads().unwrap() - ); - - loop { - if kill_switch.load(Ordering::Relaxed) { - info!("Kill switch activated, exiting"); - return Ok(()); - } - let poll_result = rpc_socket - .poll(zmq::POLLIN, 100) - .with_context(|| "Error polling ZMQ socket. Bailing out.")?; - if poll_result == 0 { - continue; - } - match rpc_socket.recv_multipart(0) { - Err(_) => { - info!("ZMQ socket closed, exiting"); - return Ok(()); - } - Ok(request) => { - trace!(num_parts = request.len(), "ZQM Request received"); - - // Components are: - if request.len() != 2 { - error!("Invalid request received, ignoring"); - - rpc_socket.send_multipart( - vec![make_response(Err(RpcRequestError::InvalidRequest))], - 0, - )?; - continue; - } - - if request.len() != 2 { - rpc_socket.send_multipart( - vec![make_response(Err(RpcRequestError::InvalidRequest))], - 0, - )?; - continue; - } - - let (client_id, request_body) = (&request[0], &request[1]); - - let Ok(client_id) = Uuid::from_slice(client_id) else { - rpc_socket.send_multipart( - vec![make_response(Err(RpcRequestError::InvalidRequest))], - 0, - )?; - continue; - }; - - // Decode 'request_body' as a bincode'd ClientEvent. - let request = - match bincode::decode_from_slice(request_body, bincode::config::standard()) { - Ok((request, _)) => request, - Err(_) => { - rpc_socket.send_multipart( - vec![make_response(Err(RpcRequestError::InvalidRequest))], - 0, - )?; - - continue; - } - }; - - // The remainder of the payload are all the request arguments, which vary depending - // on the type. - let response = rpc_server.clone().process_request(client_id, request); - rpc_socket.send_multipart(vec![response], 0)?; - } - } +impl SessionFactory for RpcServer { + fn mk_background_session( + self: Arc, + player: Objid, + ) -> Result, SessionError> { + self.clone().new_session(Uuid::new_v4(), player) } } diff --git a/crates/daemon/src/tasks_wt.rs b/crates/daemon/src/tasks_wt.rs index 4cf28e17..56ec5ec4 100644 --- a/crates/daemon/src/tasks_wt.rs +++ b/crates/daemon/src/tasks_wt.rs @@ -245,6 +245,55 @@ impl TasksDb for WiredTigerTasksDb { Ok(()) } + + fn delete_all_tasks(&self) -> Result<(), TasksDbError> { + // Scan the entire tasks table, deserializing each record into a SuspendedTask + let session = self + .connection + .clone() + .open_session(self.session_config.clone()) + .map_err(|e| { + error!("Failed to open session: {:?}", e); + TasksDbError::CouldNotLoadTasks + })?; + + session.begin_transaction(None).unwrap(); + let cursor = session + .open_cursor(&self.tasks_table, Some(CursorConfig::new().raw(true))) + .map_err(|e| { + error!("Failed to open cursor: {:?}", e); + TasksDbError::CouldNotLoadTasks + })?; + + cursor.reset().map_err(|e| { + error!("Failed to reset cursor to start: {:?}", e); + TasksDbError::CouldNotLoadTasks + })?; + + loop { + match cursor.next() { + Ok(_) => {} + Err(Error::NotFound) => { + break; + } + Err(e) => { + error!("Failed to advance cursor: {:?}", e); + return Err(TasksDbError::CouldNotLoadTasks); + } + } + + cursor.remove().map_err(|e| { + error!("Failed to remove record: {:?}", e); + TasksDbError::CouldNotLoadTasks + })?; + } + + session.commit().map_err(|e| { + error!("Failed to commit transaction: {:?}", e); + TasksDbError::CouldNotLoadTasks + })?; + Ok(()) + } } #[cfg(test)] diff --git a/crates/kernel/src/tasks/scheduler.rs b/crates/kernel/src/tasks/scheduler.rs index fabfae4e..beb873e6 100644 --- a/crates/kernel/src/tasks/scheduler.rs +++ b/crates/kernel/src/tasks/scheduler.rs @@ -51,7 +51,7 @@ use crate::matching::ws_match_env::WsMatchEnv; use crate::tasks::command_parse::ParseMatcher; use crate::tasks::scheduler::SchedulerError::VerbProgramFailed; use crate::tasks::scheduler_client::{SchedulerClient, SchedulerClientMsg}; -use crate::tasks::sessions::Session; +use crate::tasks::sessions::{Session, SessionFactory}; use crate::tasks::suspension::{SuspensionQ, WakeCondition}; use crate::tasks::task::Task; use crate::tasks::task_scheduler_client::{TaskControlMsg, TaskSchedulerClient}; @@ -221,10 +221,10 @@ impl Scheduler { } /// Execute the scheduler loop, run from the server process. - #[instrument(skip(self))] - pub fn run(mut self) { + #[instrument(skip(self, bg_session_factory))] + pub fn run(mut self, bg_session_factory: Arc) { // Rehydrate suspended tasks. - self.task_q.suspended.load_tasks(); + self.task_q.suspended.load_tasks(bg_session_factory); self.running = true; info!("Starting scheduler loop"); diff --git a/crates/kernel/src/tasks/sessions.rs b/crates/kernel/src/tasks/sessions.rs index 69614fb2..ef4e2cf6 100644 --- a/crates/kernel/src/tasks/sessions.rs +++ b/crates/kernel/src/tasks/sessions.rs @@ -101,6 +101,14 @@ pub trait Session: Send + Sync { fn idle_seconds(&self, player: Objid) -> Result; } +/// A factory for creating background sessions, usually on task resumption on server restart. +pub trait SessionFactory { + fn mk_background_session( + self: Arc, + player: Objid, + ) -> Result, SessionError>; +} + #[derive(Debug, Error)] pub enum SessionError { #[error("No connection for player {0}")] diff --git a/crates/kernel/src/tasks/suspension.rs b/crates/kernel/src/tasks/suspension.rs index a70eac1a..b49b6121 100644 --- a/crates/kernel/src/tasks/suspension.rs +++ b/crates/kernel/src/tasks/suspension.rs @@ -13,7 +13,7 @@ // use crate::tasks::scheduler::TaskResult; -use crate::tasks::sessions::{NoopClientSession, Session}; +use crate::tasks::sessions::{NoopClientSession, Session, SessionFactory}; use crate::tasks::task::Task; use crate::tasks::{TaskDescription, TaskId, TasksDb}; use bincode::de::{BorrowDecoder, Decoder}; @@ -82,7 +82,7 @@ impl SuspensionQ { /// Load all tasks from the tasks database. Called on startup to reconstitute the task list /// from the database. - pub(crate) fn load_tasks(&mut self) { + pub(crate) fn load_tasks(&mut self, bg_session_factory: Arc) { // LambdaMOO doesn't do anything special to filter out tasks that are too old, or tasks that // are related to disconnected players, or anything like that. // We'll just start them all up and let the scheduler handle them. @@ -93,11 +93,19 @@ impl SuspensionQ { .load_tasks() .expect("Unable to reconstitute tasks from tasks database"); let num_tasks = tasks.len(); - for task in tasks { + for mut task in tasks { debug!(wake_condition = ?task.wake_condition, task_id = task.task.task_id, start = ?task.task.task_start , "Loaded suspended task from tasks database"); + task.session = bg_session_factory + .clone() + .mk_background_session(task.task.player) + .expect("Unable to create new background session for suspended task"); self.tasks.insert(task.task.task_id, task); } + // Now delete them from the database. + if let Err(e) = self.tasks_database.delete_all_tasks() { + error!(?e, "Could not delete suspended tasks from tasks database"); + } info!(?num_tasks, "Loaded suspended tasks from tasks database") } diff --git a/crates/kernel/src/tasks/tasks_db.rs b/crates/kernel/src/tasks/tasks_db.rs index b77f918e..7e9caa3b 100644 --- a/crates/kernel/src/tasks/tasks_db.rs +++ b/crates/kernel/src/tasks/tasks_db.rs @@ -27,6 +27,7 @@ pub trait TasksDb: Send { fn load_tasks(&self) -> Result, TasksDbError>; fn save_task(&self, task: &SuspendedTask) -> Result<(), TasksDbError>; fn delete_task(&self, task_id: TaskId) -> Result<(), TasksDbError>; + fn delete_all_tasks(&self) -> Result<(), TasksDbError>; } pub struct NoopTasksDb {} @@ -43,4 +44,8 @@ impl TasksDb for NoopTasksDb { fn delete_task(&self, _task_id: TaskId) -> Result<(), TasksDbError> { Ok(()) } + + fn delete_all_tasks(&self) -> Result<(), TasksDbError> { + Ok(()) + } } diff --git a/crates/kernel/testsuite/moot_suite.rs b/crates/kernel/testsuite/moot_suite.rs index e4406997..015678a4 100644 --- a/crates/kernel/testsuite/moot_suite.rs +++ b/crates/kernel/testsuite/moot_suite.rs @@ -36,6 +36,7 @@ use moor_values::var::{v_none, Objid, Var}; #[cfg(feature = "relbox")] use common::create_relbox_db; +use moor_kernel::tasks::sessions::{SessionError, SessionFactory}; use moor_kernel::tasks::NoopTasksDb; #[derive(Clone)] @@ -120,6 +121,16 @@ fn test_wiredtiger(path: &Path) { } test_each_file::test_each_path! { in "./crates/kernel/testsuite/moot" as wiredtiger => test_wiredtiger } +struct NoopSessionFactory {} +impl SessionFactory for NoopSessionFactory { + fn mk_background_session( + self: Arc, + _player: Objid, + ) -> Result, SessionError> { + Ok(Arc::new(NoopClientSession::new())) + } +} + fn test(db: Box, path: &Path) { if path.is_dir() { return; @@ -127,9 +138,10 @@ fn test(db: Box, path: &Path) { let tasks_db = Box::new(NoopTasksDb {}); let scheduler = Scheduler::new(db, tasks_db, Config::default()); let scheduler_client = scheduler.client().unwrap(); + let session_factory = Arc::new(NoopSessionFactory {}); let scheduler_loop_jh = std::thread::Builder::new() .name("moor-scheduler".to_string()) - .spawn(move || scheduler.run()) + .spawn(move || scheduler.run(session_factory.clone())) .expect("Failed to spawn scheduler"); execute_moot_test(