diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index acd0f04f83..8c9e0882c7 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -312,7 +312,7 @@ impl Sql { Argument::String(event_id.to_string()), Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ], - QueryType::Other, + QueryType::EventMessage(entity.clone()), ); self.query_queue.enqueue( "INSERT INTO event_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \ diff --git a/crates/torii/libp2p/src/tests.rs b/crates/torii/libp2p/src/tests.rs index 7ef1472068..7be8d30092 100644 --- a/crates/torii/libp2p/src/tests.rs +++ b/crates/torii/libp2p/src/tests.rs @@ -12,9 +12,15 @@ mod test { use crypto_bigint::U256; use dojo_types::primitive::Primitive; use dojo_types::schema::{Enum, EnumOption, Member, Struct, Ty}; + use dojo_world::contracts::abi::model::Layout; + use futures::StreamExt; use katana_runner::KatanaRunner; use serde_json::Number; + use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; use starknet::core::types::Felt; + use torii_core::simple_broker::SimpleBroker; + use torii_core::sql::Sql; + use torii_core::types::EventMessage; #[cfg(target_arch = "wasm32")] use wasm_bindgen_test::*; @@ -680,6 +686,57 @@ mod test { } } + // Test to verify that setting an entity message in the SQL database + // triggers a publish event on the broker + #[tokio::test] + async fn test_entity_message_trigger_publish() -> Result<(), Box> { + let _ = tracing_subscriber::fmt() + .with_env_filter("torii::relay::client=debug,torii::relay::server=debug") + .try_init(); + + let options = ::from_str("sqlite::memory:") + .unwrap() + .create_if_missing(true); + let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap(); + sqlx::migrate!("../migrations").run(&pool).await.unwrap(); + + let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); + let mut broker = SimpleBroker::::subscribe(); + + let entity = Ty::Struct(Struct { name: "Message".to_string(), children: vec![] }); + db.register_model( + "test_namespace", + entity.clone(), + Layout::Fixed(vec![]), + Felt::ZERO, + Felt::ZERO, + 0, + 0, + 0, + ) + .await?; + + // FIXME: register_model and set_event_message handle the name and namespace of entity type + // differently. + let entity = + Ty::Struct(Struct { name: "test_namespace-Message".to_string(), children: vec![] }); + + // Set the event message in the database + db.set_event_message(entity, "some_entity_id", 0).await?; + db.query_queue.execute_all().await?; + + // Check if a message was published to the broker + tokio::select! { + Some(message) = broker.next() => { + println!("Received message: {:?}", message); + Ok(()) + }, + _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => { + Err("Timeout: No message received".into()) + } + } + } + #[cfg(target_arch = "wasm32")] #[wasm_bindgen_test] async fn test_client_connection_wasm() -> Result<(), Box> {