Skip to content

Commit

Permalink
opt(torii): avoid re-processing of transactions in certain case
Browse files Browse the repository at this point in the history
fix: #2355

commit-id:a510b985
  • Loading branch information
lambda-0x committed Sep 3, 2024
1 parent 6ebd806 commit 0ca96a5
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 34 deletions.
63 changes: 40 additions & 23 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub struct FetchRangeResult {
#[derive(Debug)]
pub struct FetchPendingResult {
pub pending_block: Box<PendingBlockWithReceipts>,
pub pending_block_tx: Option<Felt>,
pub last_pending_block_tx: Option<Felt>,
pub block_number: u64,
}

Expand Down Expand Up @@ -114,9 +114,14 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
}

pub async fn start(&mut self) -> Result<()> {
let (head, pending_block_tx) = self.db.head().await?;
// use the start block provided by user if head is 0
let (head, last_pending_block_world_tx, last_pending_block_tx) = self.db.head().await?;

Check warning on line 118 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L118

Added line #L118 was not covered by tests
if head == 0 {
self.db.set_head(head, pending_block_tx);
self.db.set_head(
self.config.start_block,
last_pending_block_world_tx,
last_pending_block_tx,
);

Check warning on line 124 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L120-L124

Added lines #L120 - L124 were not covered by tests
} else if self.config.start_block != 0 {
warn!(target: LOG_TARGET, "Start block ignored, stored head exists and will be used instead.");
}
Expand All @@ -128,12 +133,12 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {

let mut erroring_out = false;
loop {
let (head, pending_block_tx) = self.db.head().await?;
let (head, last_pending_block_world_tx, last_pending_block_tx) = self.db.head().await?;

Check warning on line 136 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L136

Added line #L136 was not covered by tests
tokio::select! {
_ = shutdown_rx.recv() => {
break Ok(());
}
res = self.fetch_data(head, pending_block_tx) => {
res = self.fetch_data(head, last_pending_block_world_tx, last_pending_block_tx) => {
match res {
Ok(fetch_result) => {
if erroring_out {
Expand Down Expand Up @@ -172,17 +177,19 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
pub async fn fetch_data(
&mut self,
from: u64,
pending_block_tx: Option<Felt>,
last_pending_block_world_tx: Option<Felt>,
last_pending_block_tx: Option<Felt>,

Check warning on line 181 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L180-L181

Added lines #L180 - L181 were not covered by tests
) -> Result<FetchDataResult> {
let latest_block_number = self.provider.block_hash_and_number().await?.block_number;

let result = if from < latest_block_number {
let from = if from == 0 { from } else { from + 1 };
debug!(target: LOG_TARGET, from = %from, to = %latest_block_number, "Fetching data for range.");
let data = self.fetch_range(from, latest_block_number, pending_block_tx).await?;
let data =
self.fetch_range(from, latest_block_number, last_pending_block_world_tx).await?;

Check warning on line 189 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L188-L189

Added lines #L188 - L189 were not covered by tests
FetchDataResult::Range(data)
} else if self.config.index_pending {
let data = self.fetch_pending(latest_block_number + 1, pending_block_tx).await?;
let data = self.fetch_pending(latest_block_number + 1, last_pending_block_tx).await?;

Check warning on line 192 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L192

Added line #L192 was not covered by tests
if let Some(data) = data {
FetchDataResult::Pending(data)
} else {
Expand All @@ -199,7 +206,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
&mut self,
from: u64,
to: u64,
pending_block_tx: Option<Felt>,
last_pending_block_world_tx: Option<Felt>,
) -> Result<FetchRangeResult> {
// Process all blocks from current to latest.
let get_events = |token: Option<String>| {
Expand Down Expand Up @@ -230,7 +237,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {

// Flatten events pages and events according to the pending block cursor
// to array of (block_number, transaction_hash)
let mut pending_block_tx_cursor = pending_block_tx;
let mut last_pending_block_world_tx_cursor = last_pending_block_world_tx;
let mut transactions = LinkedHashMap::new();
for events_page in events_pages {
debug!("Processing events page with events: {}", &events_page.events.len());
Expand Down Expand Up @@ -270,17 +277,17 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {

// Then we skip all transactions until we reach the last pending processed
// transaction (if any)
if let Some(tx) = pending_block_tx_cursor {
if let Some(tx) = last_pending_block_world_tx_cursor {
if event.transaction_hash != tx {
continue;
}

pending_block_tx_cursor = None;
last_pending_block_world_tx_cursor = None;

Check warning on line 285 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L285

Added line #L285 was not covered by tests
}

// Skip the latest pending block transaction events
// * as we might have multiple events for the same transaction
if let Some(tx) = pending_block_tx {
if let Some(tx) = last_pending_block_world_tx {
if event.transaction_hash == tx {
continue;
}
Expand All @@ -302,7 +309,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
async fn fetch_pending(
&self,
block_number: u64,
pending_block_tx: Option<Felt>,
last_pending_block_tx: Option<Felt>,

Check warning on line 312 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L312

Added line #L312 was not covered by tests
) -> Result<Option<FetchPendingResult>> {
let block = if let MaybePendingBlockWithReceipts::PendingBlock(pending) =
self.provider.get_block_with_receipts(BlockId::Tag(BlockTag::Pending)).await?
Expand All @@ -318,7 +325,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
Ok(Some(FetchPendingResult {
pending_block: Box::new(block),
block_number,
pending_block_tx,
last_pending_block_tx,

Check warning on line 328 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L328

Added line #L328 was not covered by tests
}))
}

Expand All @@ -339,17 +346,21 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
pub async fn process_pending(&mut self, data: FetchPendingResult) -> Result<()> {
// Skip transactions that have been processed already
// Our cursor is the last processed transaction
let mut pending_block_tx_cursor = data.pending_block_tx;
let mut pending_block_tx = data.pending_block_tx;

let mut last_pending_block_tx_cursor = data.last_pending_block_tx;
let mut last_pending_block_tx = data.last_pending_block_tx;
let mut last_pending_block_world_tx = None;

Check warning on line 353 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L349-L353

Added lines #L349 - L353 were not covered by tests
let timestamp = data.pending_block.timestamp;

for t in data.pending_block.transactions {
let transaction_hash = t.transaction.transaction_hash();
if let Some(tx) = pending_block_tx_cursor {
if let Some(tx) = last_pending_block_tx_cursor {

Check warning on line 358 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L358

Added line #L358 was not covered by tests
if transaction_hash != &tx {
continue;
}

pending_block_tx_cursor = None;
last_pending_block_tx_cursor = None;

Check warning on line 363 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L363

Added line #L363 was not covered by tests
continue;
}

Expand All @@ -362,7 +373,11 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
// provider. So we can fail silently and try
// again in the next iteration.
warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Retrieving pending transaction receipt.");
self.db.set_head(data.block_number - 1, pending_block_tx);
self.db.set_head(
data.block_number - 1,
last_pending_block_world_tx,
last_pending_block_tx,
);

Check warning on line 380 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L376-L380

Added lines #L376 - L380 were not covered by tests
return Ok(());
}
_ => {
Expand All @@ -372,18 +387,20 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
}
}
Ok(true) => {
pending_block_tx = Some(*transaction_hash);
last_pending_block_world_tx = Some(*transaction_hash);
last_pending_block_tx = Some(*transaction_hash);

Check warning on line 391 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L390-L391

Added lines #L390 - L391 were not covered by tests
info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending world transaction.");
}
Ok(_) => {
last_pending_block_tx = Some(*transaction_hash);

Check warning on line 395 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L395

Added line #L395 was not covered by tests
info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending transaction.")
}
}
}

// Set the head to the last processed pending transaction
// Head block number should still be latest block number
self.db.set_head(data.block_number - 1, pending_block_tx);
self.db.set_head(data.block_number - 1, last_pending_block_world_tx, last_pending_block_tx);

Check warning on line 403 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L403

Added line #L403 was not covered by tests

self.db.execute().await?;
Ok(())
Expand Down Expand Up @@ -426,7 +443,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
// so once the sync range is done, we assume all of the tx of the block
// have been processed.

self.db.set_head(data.latest_block_number, None);
self.db.set_head(data.latest_block_number, None, None);
self.db.execute().await?;

Ok(())
Expand Down
36 changes: 25 additions & 11 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,33 +65,47 @@ impl Sql {
})
}

pub async fn head(&self) -> Result<(u64, Option<Felt>)> {
pub async fn head(&self) -> Result<(u64, Option<Felt>, Option<Felt>)> {

Check warning on line 68 in crates/torii/core/src/sql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql.rs#L68

Added line #L68 was not covered by tests
let mut conn: PoolConnection<Sqlite> = self.pool.acquire().await?;
let indexer_query = sqlx::query_as::<_, (Option<i64>, Option<String>, String)>(
"SELECT head, pending_block_tx, contract_type FROM contracts WHERE id = ?",
)
.bind(format!("{:#x}", self.world_address));

let indexer: (Option<i64>, Option<String>, String) =
let indexer_query =
sqlx::query_as::<_, (Option<i64>, Option<String>, Option<String>, String)>(
"SELECT head, last_pending_block_world_tx, last_pending_block_tx, contract_type \
FROM contracts WHERE id = ?",
)
.bind(format!("{:#x}", self.world_address));

Check warning on line 75 in crates/torii/core/src/sql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql.rs#L70-L75

Added lines #L70 - L75 were not covered by tests

let indexer: (Option<i64>, Option<String>, Option<String>, String) =

Check warning on line 77 in crates/torii/core/src/sql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql.rs#L77

Added line #L77 was not covered by tests
indexer_query.fetch_one(&mut *conn).await?;
Ok((
indexer.0.map(|h| h.try_into().expect("doesn't fit in u64")).unwrap_or(0),
indexer.1.map(|f| Felt::from_str(&f)).transpose()?,
indexer.2.map(|f| Felt::from_str(&f)).transpose()?,

Check warning on line 82 in crates/torii/core/src/sql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql.rs#L82

Added line #L82 was not covered by tests
))
}

pub fn set_head(&mut self, head: u64, pending_block_tx: Option<Felt>) {
pub fn set_head(
&mut self,
head: u64,
last_pending_block_world_tx: Option<Felt>,
last_pending_block_tx: Option<Felt>,
) {
let head = Argument::Int(head.try_into().expect("doesn't fit in u64"));
let id = Argument::FieldElement(self.world_address);
let pending_block_tx = if let Some(f) = pending_block_tx {
let last_pending_block_world_tx = if let Some(f) = last_pending_block_world_tx {
Argument::String(format!("{:#x}", f))

Check warning on line 95 in crates/torii/core/src/sql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql.rs#L95

Added line #L95 was not covered by tests
} else {
Argument::Null
};
let last_pending_block_tx = if let Some(f) = last_pending_block_tx {
Argument::String(format!("{:#x}", f))
} else {
Argument::Null
};

self.query_queue.enqueue(
"UPDATE contracts SET head = ?, pending_block_tx = ? WHERE id = ?",
vec![head, pending_block_tx, id],
"UPDATE contracts SET head = ?, last_pending_block_world_tx = ?, \
last_pending_block_tx = ? WHERE id = ?",
vec![head, last_pending_block_world_tx, last_pending_block_tx, id],
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Rename pending_block_tx to last_pending_block_world_tx
ALTER TABLE contracts RENAME COLUMN pending_block_tx TO last_pending_block_world_tx;

-- Add new column last_pending_block_tx
ALTER TABLE contracts ADD COLUMN last_pending_block_tx TEXT;

0 comments on commit 0ca96a5

Please sign in to comment.