Skip to content

Commit

Permalink
Properly provide a working Session to loaded tasks
Browse files Browse the repository at this point in the history
  * Adds the concept of a SessionFactory to produce "background"
    sessions.
  * Resuming a task that does e.g. a `notify` now works.
  * Had to refactor `RpcServer` a bit to make this work.
  • Loading branch information
rdaum committed Jul 1, 2024
1 parent 5812e27 commit 0b859ed
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 200 deletions.
37 changes: 23 additions & 14 deletions crates/daemon/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use moor_kernel::config::Config;
use moor_kernel::tasks::scheduler::Scheduler;
use moor_kernel::textdump::textdump_load;

use crate::rpc_server::zmq_loop;
use crate::rpc_server::RpcServer;

#[cfg(feature = "relbox")]
use moor_db_relbox::RelBoxDatabaseBuilder;
Expand Down Expand Up @@ -302,10 +302,26 @@ fn main() -> Result<(), Report> {
let scheduler = Scheduler::new(database, tasks_db, config);
let scheduler_client = scheduler.client().expect("Failed to get scheduler client");

// We have to create the RpcServer before starting the scheduler because we need to pass it in
// as a parameter to the scheduler for background session construction.

let zmq_ctx = zmq::Context::new();
zmq_ctx
.set_io_threads(args.num_io_threads)
.expect("Failed to set number of IO threads");
let rpc_server = Arc::new(RpcServer::new(
keypair,
args.connections_file,
zmq_ctx.clone(),
args.narrative_listen.as_str(),
args.db_flavour,
));

// The scheduler thread:
let scheduler_rpc_server = rpc_server.clone();
let scheduler_loop_jh = std::thread::Builder::new()
.name("moor-scheduler".to_string())
.spawn(move || scheduler.run())?;
.spawn(move || scheduler.run(scheduler_rpc_server))?;

let kill_switch = Arc::new(std::sync::atomic::AtomicBool::new(false));

Expand All @@ -327,22 +343,15 @@ fn main() -> Result<(), Report> {
})?;

let rpc_kill_switch = kill_switch.clone();

let rpc_loop_scheduler_client = scheduler_client.clone();
let rpc_listen = args.rpc_listen.clone();
let rpc_narrative_listen = args.narrative_listen.clone();
let rpc_scheduler_client = scheduler_client.clone();
let rpc_loop_thread = std::thread::Builder::new()
.name("moor-rpc".to_string())
.spawn(move || {
let _ = zmq_loop(
keypair,
args.connections_file,
rpc_scheduler_client,
rpc_listen,
rpc_narrative_listen,
Some(args.num_io_threads),
rpc_kill_switch,
args.db_flavour,
);
rpc_server
.zmq_loop(rpc_listen, rpc_loop_scheduler_client, rpc_kill_switch)
.expect("RPC thread failed");
})?;

signal_hook::flag::register(signal_hook::consts::SIGTERM, kill_switch.clone())?;
Expand Down
Loading

0 comments on commit 0b859ed

Please sign in to comment.