Skip to content

Commit

Permalink
Fix grpc routing for torii server
Browse files Browse the repository at this point in the history
  • Loading branch information
tarrencev committed Nov 8, 2023
1 parent fbceabe commit ec415e4
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 73 deletions.
8 changes: 7 additions & 1 deletion crates/torii/grpc/build.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::path::PathBuf;

fn main() -> Result<(), Box<dyn std::error::Error>> {
let target = std::env::var("TARGET").expect("failed to get TARGET environment variable");
let out_dir =
PathBuf::from(std::env::var("OUT_DIR").expect("OUT_DIR environment variable not set"));
let target = std::env::var("TARGET").expect("TARGET environment variable not set");
let feature_client = std::env::var("CARGO_FEATURE_CLIENT");
let feature_server = std::env::var("CARGO_FEATURE_SERVER");

Expand All @@ -11,11 +15,13 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
wasm_tonic_build::configure()
.build_server(false)
.build_client(feature_client.is_ok())
.file_descriptor_set_path(out_dir.join("world_descriptor.bin"))
.compile(&["proto/world.proto"], &["proto"])?;
} else {
tonic_build::configure()
.build_server(feature_server.is_ok())
.build_client(feature_client.is_ok())
.file_descriptor_set_path(out_dir.join("world_descriptor.bin"))
.compile(&["proto/world.proto"], &["proto"])?;
}
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions crates/torii/grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

use futures_util::stream::MapOk;
use futures_util::{Stream, StreamExt, TryStreamExt};
use protos::world::{world_client, SubscribeEntitiesRequest};
use proto::world::{world_client, SubscribeEntitiesRequest};
use starknet::core::types::{FromStrError, StateUpdate};
use starknet_crypto::FieldElement;

use crate::protos::world::{MetadataRequest, SubscribeEntitiesResponse};
use crate::protos::{self};
use crate::proto::world::{MetadataRequest, SubscribeEntitiesResponse};
use crate::proto::{self};

#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand Down
58 changes: 29 additions & 29 deletions crates/torii/grpc/src/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ use starknet::core::types::{
};
use starknet_crypto::FieldElement;

use crate::protos;
use crate::proto;

impl TryFrom<protos::types::ModelMetadata> for dojo_types::schema::ModelMetadata {
impl TryFrom<proto::types::ModelMetadata> for dojo_types::schema::ModelMetadata {
type Error = FromStrError;
fn try_from(value: protos::types::ModelMetadata) -> Result<Self, Self::Error> {
fn try_from(value: proto::types::ModelMetadata) -> Result<Self, Self::Error> {
let schema: Ty = serde_json::from_slice(&value.schema).unwrap();
let layout: Vec<FieldElement> = value.layout.into_iter().map(FieldElement::from).collect();
Ok(Self {
Expand All @@ -27,9 +27,9 @@ impl TryFrom<protos::types::ModelMetadata> for dojo_types::schema::ModelMetadata
}
}

impl TryFrom<protos::types::WorldMetadata> for dojo_types::WorldMetadata {
impl TryFrom<proto::types::WorldMetadata> for dojo_types::WorldMetadata {
type Error = FromStrError;
fn try_from(value: protos::types::WorldMetadata) -> Result<Self, Self::Error> {
fn try_from(value: proto::types::WorldMetadata) -> Result<Self, Self::Error> {
let models = value
.models
.into_iter()
Expand All @@ -46,38 +46,38 @@ impl TryFrom<protos::types::WorldMetadata> for dojo_types::WorldMetadata {
}
}

impl From<EntityQuery> for protos::types::EntityQuery {
impl From<EntityQuery> for proto::types::EntityQuery {
fn from(value: EntityQuery) -> Self {
Self { model: value.model, clause: Some(value.clause.into()) }
}
}

impl From<Clause> for protos::types::Clause {
impl From<Clause> for proto::types::Clause {
fn from(value: Clause) -> Self {
match value {
Clause::Keys(clause) => {
Self { clause_type: Some(protos::types::clause::ClauseType::Keys(clause.into())) }
Self { clause_type: Some(proto::types::clause::ClauseType::Keys(clause.into())) }
}
Clause::Attribute(clause) => Self {
clause_type: Some(protos::types::clause::ClauseType::Attribute(clause.into())),
clause_type: Some(proto::types::clause::ClauseType::Attribute(clause.into())),
},
Clause::Composite(clause) => Self {
clause_type: Some(protos::types::clause::ClauseType::Composite(clause.into())),
clause_type: Some(proto::types::clause::ClauseType::Composite(clause.into())),
},
}
}
}

impl From<KeysClause> for protos::types::KeysClause {
impl From<KeysClause> for proto::types::KeysClause {
fn from(value: KeysClause) -> Self {
Self { keys: value.keys.iter().map(|k| k.to_bytes_be().into()).collect() }
}
}

impl TryFrom<protos::types::KeysClause> for KeysClause {
impl TryFrom<proto::types::KeysClause> for KeysClause {
type Error = FromByteSliceError;

fn try_from(value: protos::types::KeysClause) -> Result<Self, Self::Error> {
fn try_from(value: proto::types::KeysClause) -> Result<Self, Self::Error> {
let keys = value
.keys
.into_iter()
Expand All @@ -88,7 +88,7 @@ impl TryFrom<protos::types::KeysClause> for KeysClause {
}
}

impl From<AttributeClause> for protos::types::AttributeClause {
impl From<AttributeClause> for proto::types::AttributeClause {
fn from(value: AttributeClause) -> Self {
Self {
attribute: value.attribute,
Expand All @@ -98,7 +98,7 @@ impl From<AttributeClause> for protos::types::AttributeClause {
}
}

impl From<CompositeClause> for protos::types::CompositeClause {
impl From<CompositeClause> for proto::types::CompositeClause {
fn from(value: CompositeClause) -> Self {
Self {
operator: value.operator as i32,
Expand All @@ -107,41 +107,41 @@ impl From<CompositeClause> for protos::types::CompositeClause {
}
}

impl From<Value> for protos::types::Value {
impl From<Value> for proto::types::Value {
fn from(value: Value) -> Self {
match value {
Value::String(val) => {
Self { value_type: Some(protos::types::value::ValueType::StringValue(val)) }
Self { value_type: Some(proto::types::value::ValueType::StringValue(val)) }
}
Value::Int(val) => {
Self { value_type: Some(protos::types::value::ValueType::IntValue(val)) }
Self { value_type: Some(proto::types::value::ValueType::IntValue(val)) }
}
Value::UInt(val) => {
Self { value_type: Some(protos::types::value::ValueType::UintValue(val)) }
Self { value_type: Some(proto::types::value::ValueType::UintValue(val)) }
}
Value::Bool(val) => {
Self { value_type: Some(protos::types::value::ValueType::BoolValue(val)) }
Self { value_type: Some(proto::types::value::ValueType::BoolValue(val)) }
}
Value::Bytes(val) => {
Self { value_type: Some(protos::types::value::ValueType::ByteValue(val)) }
Self { value_type: Some(proto::types::value::ValueType::ByteValue(val)) }
}
}
}
}

impl TryFrom<protos::types::StorageEntry> for StorageEntry {
impl TryFrom<proto::types::StorageEntry> for StorageEntry {
type Error = FromStrError;
fn try_from(value: protos::types::StorageEntry) -> Result<Self, Self::Error> {
fn try_from(value: proto::types::StorageEntry) -> Result<Self, Self::Error> {
Ok(Self {
key: FieldElement::from_str(&value.key)?,
value: FieldElement::from_str(&value.value)?,
})
}
}

impl TryFrom<protos::types::StorageDiff> for ContractStorageDiffItem {
impl TryFrom<proto::types::StorageDiff> for ContractStorageDiffItem {
type Error = FromStrError;
fn try_from(value: protos::types::StorageDiff) -> Result<Self, Self::Error> {
fn try_from(value: proto::types::StorageDiff) -> Result<Self, Self::Error> {
Ok(Self {
address: FieldElement::from_str(&value.address)?,
storage_entries: value
Expand All @@ -153,9 +153,9 @@ impl TryFrom<protos::types::StorageDiff> for ContractStorageDiffItem {
}
}

impl TryFrom<protos::types::EntityDiff> for StateDiff {
impl TryFrom<proto::types::EntityDiff> for StateDiff {
type Error = FromStrError;
fn try_from(value: protos::types::EntityDiff) -> Result<Self, Self::Error> {
fn try_from(value: proto::types::EntityDiff) -> Result<Self, Self::Error> {
Ok(Self {
nonces: vec![],
declared_classes: vec![],
Expand All @@ -171,9 +171,9 @@ impl TryFrom<protos::types::EntityDiff> for StateDiff {
}
}

impl TryFrom<protos::types::EntityUpdate> for StateUpdate {
impl TryFrom<proto::types::EntityUpdate> for StateUpdate {
type Error = FromStrError;
fn try_from(value: protos::types::EntityUpdate) -> Result<Self, Self::Error> {
fn try_from(value: proto::types::EntityUpdate) -> Result<Self, Self::Error> {
Ok(Self {
new_root: FieldElement::ZERO,
old_root: FieldElement::ZERO,
Expand Down
6 changes: 5 additions & 1 deletion crates/torii/grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ pub mod client;
#[cfg(feature = "server")]
pub mod server;

pub mod protos {
pub mod proto {
pub mod world {
tonic::include_proto!("world");

#[cfg(feature = "server")]
pub(crate) const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("world_descriptor");
}
pub mod types {
tonic::include_proto!("types");
Expand Down
33 changes: 20 additions & 13 deletions crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::sync::Arc;

use dojo_types::schema::KeysClause;
use futures::Stream;
use protos::world::{
use proto::world::{
MetadataRequest, MetadataResponse, SubscribeEntitiesRequest, SubscribeEntitiesResponse,
};
use sqlx::{Pool, Sqlite};
Expand All @@ -26,9 +26,9 @@ use torii_core::error::{Error, ParseError};
use torii_core::model::{parse_sql_model_members, SqlModelMember};

use self::subscription::SubscribeRequest;
use crate::protos::types::clause::ClauseType;
use crate::protos::world::world_server::WorldServer;
use crate::protos::{self};
use crate::proto::types::clause::ClauseType;
use crate::proto::world::world_server::WorldServer;
use crate::proto::{self};

#[derive(Clone)]
pub struct DojoWorld {
Expand Down Expand Up @@ -58,7 +58,7 @@ impl DojoWorld {
}

impl DojoWorld {
pub async fn metadata(&self) -> Result<protos::types::WorldMetadata, Error> {
pub async fn metadata(&self) -> Result<proto::types::WorldMetadata, Error> {
let (world_address, world_class_hash, executor_address, executor_class_hash): (
String,
String,
Expand All @@ -81,7 +81,7 @@ impl DojoWorld {
let mut models_metadata = Vec::with_capacity(models.len());
for model in models {
let schema = self.model_schema(&model.0).await?;
models_metadata.push(protos::types::ModelMetadata {
models_metadata.push(proto::types::ModelMetadata {
name: model.0,
class_hash: model.1,
packed_size: model.2,
Expand All @@ -90,8 +90,9 @@ impl DojoWorld {
schema: serde_json::to_vec(&schema).unwrap(),
});
}
println!("{:?}", world_address);

Ok(protos::types::WorldMetadata {
Ok(proto::types::WorldMetadata {
world_address,
world_class_hash,
executor_address,
Expand All @@ -112,7 +113,7 @@ impl DojoWorld {
Ok(parse_sql_model_members(model, &model_members))
}

pub async fn model_metadata(&self, model: &str) -> Result<protos::types::ModelMetadata, Error> {
pub async fn model_metadata(&self, model: &str) -> Result<proto::types::ModelMetadata, Error> {
let (name, class_hash, packed_size, unpacked_size, layout): (
String,
String,
Expand All @@ -129,7 +130,7 @@ impl DojoWorld {
let schema = self.model_schema(model).await?;
let layout = hex::decode(&layout).unwrap();

Ok(protos::types::ModelMetadata {
Ok(proto::types::ModelMetadata {
name,
layout,
class_hash,
Expand All @@ -141,8 +142,8 @@ impl DojoWorld {

async fn subscribe_entities(
&self,
queries: Vec<protos::types::EntityQuery>,
) -> Result<Receiver<Result<protos::world::SubscribeEntitiesResponse, tonic::Status>>, Error>
queries: Vec<proto::types::EntityQuery>,
) -> Result<Receiver<Result<proto::world::SubscribeEntitiesResponse, tonic::Status>>, Error>
{
let mut subs = Vec::with_capacity(queries.len());
for query in queries {
Expand All @@ -160,7 +161,7 @@ impl DojoWorld {
let model = cairo_short_string_to_felt(&query.model)
.map_err(ParseError::CairoShortStringToFelt)?;

let protos::types::ModelMetadata { packed_size, .. } =
let proto::types::ModelMetadata { packed_size, .. } =
self.model_metadata(&query.model).await?;

subs.push(SubscribeRequest {
Expand All @@ -183,7 +184,7 @@ type SubscribeEntitiesResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeEntitiesResponse, Status>> + Send>>;

#[tonic::async_trait]
impl protos::world::world_server::World for DojoWorld {
impl proto::world::world_server::World for DojoWorld {
async fn world_metadata(
&self,
_request: Request<MetadataRequest>,
Expand Down Expand Up @@ -222,12 +223,18 @@ pub async fn new(
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;

let reflection = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(proto::world::FILE_DESCRIPTOR_SET)
.build()
.unwrap();

let world = DojoWorld::new(pool.clone(), block_rx, world_address, provider);
let server = WorldServer::new(world);

let server_future = Server::builder()
// GrpcWeb is over http1 so we must enable it.
.accept_http1(true)
.add_service(reflection)
.add_service(tonic_web::enable(server))
.serve_with_incoming_shutdown(TcpListenerStream::new(listener), async move {
shutdown_rx.recv().await.map_or((), |_| ())
Expand Down
18 changes: 9 additions & 9 deletions crates/torii/grpc/src/server/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::sync::RwLock;
use tracing::{debug, error, trace};

use super::error::SubscriptionError as Error;
use crate::protos;
use crate::proto;

pub struct ModelMetadata {
pub name: FieldElement,
Expand All @@ -34,7 +34,7 @@ pub struct Subscriber {
/// The storage addresses that the subscriber is interested in.
storage_addresses: HashSet<FieldElement>,
/// The channel to send the response back to the subscriber.
sender: Sender<Result<protos::world::SubscribeEntitiesResponse, tonic::Status>>,
sender: Sender<Result<proto::world::SubscribeEntitiesResponse, tonic::Status>>,
}

#[derive(Default)]
Expand All @@ -46,7 +46,7 @@ impl SubscriberManager {
pub(super) async fn add_subscriber(
&self,
entities: Vec<SubscribeRequest>,
) -> Receiver<Result<protos::world::SubscribeEntitiesResponse, tonic::Status>> {
) -> Receiver<Result<proto::world::SubscribeEntitiesResponse, tonic::Status>> {
let id = rand::thread_rng().gen::<usize>();

let (sender, receiver) = channel(1);
Expand Down Expand Up @@ -139,25 +139,25 @@ where
.filter(|entry| sub.storage_addresses.contains(&entry.key))
.map(|entry| {
let StorageEntry { key, value } = entry;
protos::types::StorageEntry {
proto::types::StorageEntry {
key: format!("{key:#x}"),
value: format!("{value:#x}"),
}
})
.collect::<Vec<protos::types::StorageEntry>>();
.collect::<Vec<proto::types::StorageEntry>>();

let entity_update = protos::types::EntityUpdate {
let entity_update = proto::types::EntityUpdate {
block_hash: format!("{:#x}", state_update.block_hash),
entity_diff: Some(protos::types::EntityDiff {
storage_diffs: vec![protos::types::StorageDiff {
entity_diff: Some(proto::types::EntityDiff {
storage_diffs: vec![proto::types::StorageDiff {
address: format!("{contract_address:#x}"),
storage_entries: relevant_storage_entries,
}],
}),
};

let resp =
protos::world::SubscribeEntitiesResponse { entity_update: Some(entity_update) };
proto::world::SubscribeEntitiesResponse { entity_update: Some(entity_update) };

if sub.sender.send(Ok(resp)).await.is_err() {
closed_stream.push(*idx);
Expand Down
Loading

0 comments on commit ec415e4

Please sign in to comment.