Skip to content

Commit

Permalink
fix pending cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
kariy committed Sep 1, 2024
1 parent 4be6ec6 commit 8e9eff1
Showing 1 changed file with 34 additions and 15 deletions.
49 changes: 34 additions & 15 deletions crates/katana/rpc/rpc/src/starknet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,10 +510,10 @@ fn fill_pending_events(

if let ExecutionResult::Success { receipt, .. } = res {
let events = receipt.events();
let txn_events_len = events.len();
let tx_events_len = events.len();

// check if cursor.event_n is correct
match (txn_events_len as u64).cmp(&cursor.event_n) {
match (tx_events_len as u64).cmp(&cursor.event_n) {
Ordering::Less => {
return Err(StarknetApiError::InvalidContinuationToken);
}
Expand All @@ -527,29 +527,48 @@ fn fill_pending_events(

// calculate the remaining capacity based on the chunk size and the current
// number of events we have taken.
let remaining_capacity = chunk_size as usize - buffer.len();
let total_can_take = (chunk_size as usize).saturating_sub(tx_events_len);

// skip events according to the continuation token.
let filtered_events = filter_events(events.iter(), filter.clone())
let filtered = filter_events(events.iter(), filter.clone())
.enumerate()
.skip(cursor.event_n as usize)
.take(remaining_capacity)
.map(|e| EmittedEvent {
block_hash: None,
block_number: None,
keys: e.keys.clone(),
data: e.data.clone(),
transaction_hash: tx.hash,
from_address: e.from_address.into(),
.take(total_can_take)
.map(|(i, e)| {
(
i,
EmittedEvent {
block_hash: None,
block_number: None,
keys: e.keys.clone(),
data: e.data.clone(),
transaction_hash: tx.hash,
from_address: e.from_address.into(),
},
)
})
.collect::<Vec<_>>();

// remaining possible events that we haven't seen due to the chunk size limit.
let chunk_seen_end = cursor.event_n as usize + total_can_take;
// get the index of the last matching event that we have reached. if there is not
// matching events (ie `filtered` is empty) we point the end of the chunk
// we've covered thus far..
let last_event_idx = filtered.last().map(|(i, _)| *i).unwrap_or(chunk_seen_end);
// the next time we have to fetch the events, we will start from this index.
let new_event_n = cursor.event_n as usize + remaining_capacity;
let new_event_n = if total_can_take == 0 {
// if we haven't taken any events, due to the chunk size limit, we need to start
// from the the same event pointed by the current cursor..
cursor.event_n as usize + last_event_idx
} else {
// start at the next event of the last event we've filtered out.
cursor.event_n as usize + last_event_idx + 1
};

buffer.extend(filtered_events);
buffer.extend(filtered.into_iter().map(|(_, event)| event));

// if there are still more events that we haven't fetched yet for this tx.
if new_event_n < txn_events_len {
if new_event_n < tx_events_len {
cursor.event_n = new_event_n as u64;
}
// reset the event index
Expand Down

0 comments on commit 8e9eff1

Please sign in to comment.