Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(libp2p): command to wait for the relay to be ready #1525

Merged
merged 5 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@
pub async fn new(
torii_url: String,
rpc_url: String,
relay_url: String,

Check warning on line 57 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L57

Added line #L57 was not covered by tests
world: FieldElement,
models_keys: Option<Vec<KeysClause>>,
) -> Result<Self, Error> {
let mut grpc_client = torii_grpc::client::WorldClient::new(torii_url, world).await?;

let relay_client = torii_relay::client::RelayClient::new(relay_url)?;

Check warning on line 63 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L63

Added line #L63 was not covered by tests

let metadata = grpc_client.metadata().await?;

Expand Down Expand Up @@ -96,11 +96,16 @@
metadata: shared_metadata,
sub_client_handle: OnceCell::new(),
inner: AsyncRwLock::new(grpc_client),
relay_client,

Check warning on line 99 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L99

Added line #L99 was not covered by tests
subscribed_models: subbed_models,
})
}

/// Waits for the relay to be ready and listening for messages.
pub async fn wait_for_relay(&mut self) -> Result<(), Error> {
self.relay_client.command_sender.wait_for_relay().await.map_err(Error::RelayClient)
}

Check warning on line 107 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L105-L107

Added lines #L105 - L107 were not covered by tests

/// Subscribes to a topic.
/// Returns true if the topic was subscribed to.
/// Returns false if the topic was already subscribed to.
Expand All @@ -127,12 +132,12 @@

/// Returns the event loop of the relay client.
/// Which can then be used to run the relay client
pub fn relay_client_runner(&self) -> Arc<Mutex<EventLoop>> {
self.relay_client.event_loop.clone()

Check warning on line 136 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L135-L136

Added lines #L135 - L136 were not covered by tests
}

/// Returns the message receiver of the relay client.
pub fn relay_client_stream(&self) -> Arc<Mutex<UnboundedReceiver<Message>>> {

Check warning on line 140 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L140

Added line #L140 was not covered by tests
self.relay_client.message_receiver.clone()
}

Expand Down Expand Up @@ -210,8 +215,8 @@
/// Initiate the model subscriptions and returns a [SubscriptionService] which when await'ed
/// will execute the subscription service and starts the syncing process.
pub async fn start_subscription(&self) -> Result<SubscriptionService, Error> {
let models_keys: Vec<KeysClause> =
self.subscribed_models.models_keys.read().clone().into_iter().collect();

Check warning on line 219 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L218-L219

Added lines #L218 - L219 were not covered by tests
let sub_res_stream = self.initiate_subscription(models_keys).await?;

let (service, handle) = SubscriptionService::new(
Expand Down
69 changes: 48 additions & 21 deletions crates/torii/libp2p/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ enum Command {
Subscribe(String, oneshot::Sender<Result<bool, Error>>),
Unsubscribe(String, oneshot::Sender<Result<bool, Error>>),
Publish(String, Vec<u8>, oneshot::Sender<Result<MessageId, Error>>),
WaitForRelay(oneshot::Sender<Result<(), Error>>),
}

impl RelayClient {
Expand Down Expand Up @@ -192,10 +193,21 @@ impl CommandSender {

rx.await.expect("Failed to receive response")
}

pub async fn wait_for_relay(&mut self) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();

self.sender.unbounded_send(Command::WaitForRelay(tx)).expect("Failed to send command");

rx.await.expect("Failed to receive response")
}
}

impl EventLoop {
pub async fn run(&mut self) {
let mut is_relay_ready = false;
let mut relay_ready_tx = None;

loop {
// Poll the swarm for new events.
select! {
Expand All @@ -210,31 +222,48 @@ impl EventLoop {
Command::Publish(topic, data, sender) => {
sender.send(self.publish(topic, data)).expect("Failed to send response");
},
Command::WaitForRelay(sender) => {
if is_relay_ready {
sender.send(Ok(())).expect("Failed to send response");
} else {
relay_ready_tx = Some(sender);
}
}
}
},
event = self.swarm.select_next_some() => {
match event {
SwarmEvent::Behaviour(event) => {
// Handle behaviour events.
if let ClientEvent::Gossipsub(gossipsub::Event::Message {
propagation_source: peer_id,
message_id,
message,
}) = event
{
// deserialize message payload
let message_payload: ServerMessage = serde_json::from_slice(&message.data)
.expect("Failed to deserialize message");

let message = Message {
match event {
// Handle behaviour events.
ClientEvent::Gossipsub(gossipsub::Event::Message {
propagation_source: peer_id,
source: PeerId::from_bytes(&message_payload.peer_id).expect("Failed to parse peer id"),
message_id,
topic: message.topic,
data: message_payload.data,
};

self.message_sender.unbounded_send(message).expect("Failed to send message");
message,
}) => {
// deserialize message payload
let message_payload: ServerMessage = serde_json::from_slice(&message.data)
.expect("Failed to deserialize message");

let message = Message {
propagation_source: peer_id,
source: PeerId::from_bytes(&message_payload.peer_id).expect("Failed to parse peer id"),
message_id,
topic: message.topic,
data: message_payload.data,
};

self.message_sender.unbounded_send(message).expect("Failed to send message");
}
ClientEvent::Gossipsub(gossipsub::Event::Subscribed { topic, .. }) => {
info!(target: "torii::relay::client::gossipsub", topic = ?topic, "Relay ready. Received subscription confirmation");

is_relay_ready = true;
if let Some(tx) = relay_ready_tx.take() {
tx.send(Ok(())).expect("Failed to send response");
}
}
_ => {}
}
}
SwarmEvent::ConnectionClosed { cause: Some(cause), .. } => {
Expand All @@ -245,9 +274,7 @@ impl EventLoop {
return;
}
}
evt => {
info!(target: "torii::relay::client", event = ?evt, "Unhandled event");
}
_ => {}
}
},
}
Expand Down
27 changes: 23 additions & 4 deletions crates/torii/libp2p/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@
let message = serde_json::from_slice::<ClientMessage>(&message.data);
if let Err(e) = message {
info!(
target: "torii::relay::server",
target: "torii::relay::server::gossipsub",

Check warning on line 150 in crates/torii/libp2p/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/libp2p/src/server/mod.rs#L150

Added line #L150 was not covered by tests
error = %e,
"Failed to deserialize message"
);
Expand Down Expand Up @@ -176,18 +176,37 @@
.as_bytes(),
) {
info!(
target: "torii::relay::server",
target: "torii::relay::server::gossipsub",

Check warning on line 179 in crates/torii/libp2p/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/libp2p/src/server/mod.rs#L179

Added line #L179 was not covered by tests
error = %e,
"Failed to publish message"
);
}
}
ServerEvent::Gossipsub(gossipsub::Event::Subscribed { peer_id, topic }) => {
info!(
target: "torii::relay::server::gossipsub",
peer_id = %peer_id,
topic = %topic,
"Subscribed to topic"
);
}
ServerEvent::Gossipsub(gossipsub::Event::Unsubscribed {
peer_id,
topic,

Check warning on line 195 in crates/torii/libp2p/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/libp2p/src/server/mod.rs#L194-L195

Added lines #L194 - L195 were not covered by tests
}) => {
info!(
target: "torii::relay::server::gossipsub",
peer_id = %peer_id,
topic = %topic,
"Unsubscribed from topic"
);

Check warning on line 202 in crates/torii/libp2p/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/libp2p/src/server/mod.rs#L197-L202

Added lines #L197 - L202 were not covered by tests
}
ServerEvent::Identify(identify::Event::Received {
info: identify::Info { observed_addr, .. },
peer_id,
}) => {
info!(
target: "torii::relay::server",
target: "torii::relay::server::identify",
peer_id = %peer_id,
observed_addr = %observed_addr,
"Received identify event"
Expand All @@ -196,7 +215,7 @@
}
ServerEvent::Ping(ping::Event { peer, result, .. }) => {
info!(
target: "torii::relay::server",
target: "torii::relay::server::ping",
peer_id = %peer,
result = ?result,
"Received ping event"
Expand Down
18 changes: 8 additions & 10 deletions crates/torii/libp2p/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,19 @@ mod test {
.with_env_filter("torii::relay::client=debug,torii::relay::server=debug")
.try_init();
// Initialize the relay server
let mut relay_server: Relay = Relay::new(9090, 9091, None, None)?;
let mut relay_server: Relay = Relay::new(9900, 9901, None, None)?;
tokio::spawn(async move {
relay_server.run().await;
});

// Initialize the first client (listener)
let mut client = RelayClient::new("/ip4/127.0.0.1/tcp/9090".to_string())?;
let mut client = RelayClient::new("/ip4/127.0.0.1/tcp/9900".to_string())?;
tokio::spawn(async move {
client.event_loop.lock().await.run().await;
});

client.command_sender.subscribe("mawmaw".to_string()).await?;
sleep(Duration::from_secs(1)).await;
client.command_sender.wait_for_relay().await?;
client.command_sender.publish("mawmaw".to_string(), "mimi".as_bytes().to_vec()).await?;

let message_receiver = client.message_receiver.clone();
Expand Down Expand Up @@ -73,23 +73,21 @@ mod test {
// Make sure the cert hash is correct - corresponding to the cert in the relay server
let mut client = RelayClient::new(
"/ip4/127.0.0.1/udp/9091/webrtc-direct/certhash/\
uEiD6v3wzt8XU3s3SqgNSBJPvn9E0VMVFm8-G0iSEsIIDxw"
uEiCAoeHQh49fCHDolECesXO0CPR7fpz0sv0PWVaIahzT4g"
.to_string(),
)?;

spawn_local(async move {
client.event_loop.lock().await.run().await;
});

// Give some time for the client to start up
let _ = wasm_timer::Delay::new(std::time::Duration::from_secs(10)).await;

client.command_sender.subscribe("mawmaw".to_string()).await?;
let _ = wasm_timer::Delay::new(std::time::Duration::from_secs(1)).await;
client.command_sender.wait_for_relay().await?;
client.command_sender.publish("mawmaw".to_string(), "mimi".as_bytes().to_vec()).await?;

let timeout = wasm_timer::Delay::new(std::time::Duration::from_secs(5));
let message_future = client.message_receiver.next();
let timeout = wasm_timer::Delay::new(std::time::Duration::from_secs(2));
let mut message_future = client.message_receiver.lock().await;
let message_future = message_future.next();

match select(message_future, timeout).await {
Either::Left((Some(_message), _)) => {
Expand Down
Loading