diff --git a/crates/katana/rpc/rpc/src/starknet/mod.rs b/crates/katana/rpc/rpc/src/starknet/mod.rs index d3a0711116..37951a44b2 100644 --- a/crates/katana/rpc/rpc/src/starknet/mod.rs +++ b/crates/katana/rpc/rpc/src/starknet/mod.rs @@ -510,10 +510,10 @@ fn fill_pending_events( if let ExecutionResult::Success { receipt, .. } = res { let events = receipt.events(); - let txn_events_len = events.len(); + let tx_events_len = events.len(); // check if cursor.event_n is correct - match (txn_events_len as u64).cmp(&cursor.event_n) { + match (tx_events_len as u64).cmp(&cursor.event_n) { Ordering::Less => { return Err(StarknetApiError::InvalidContinuationToken); } @@ -527,29 +527,48 @@ fn fill_pending_events( // calculate the remaining capacity based on the chunk size and the current // number of events we have taken. - let remaining_capacity = chunk_size as usize - buffer.len(); + let total_can_take = (chunk_size as usize).saturating_sub(tx_events_len); // skip events according to the continuation token. - let filtered_events = filter_events(events.iter(), filter.clone()) + let filtered = filter_events(events.iter(), filter.clone()) + .enumerate() .skip(cursor.event_n as usize) - .take(remaining_capacity) - .map(|e| EmittedEvent { - block_hash: None, - block_number: None, - keys: e.keys.clone(), - data: e.data.clone(), - transaction_hash: tx.hash, - from_address: e.from_address.into(), + .take(total_can_take) + .map(|(i, e)| { + ( + i, + EmittedEvent { + block_hash: None, + block_number: None, + keys: e.keys.clone(), + data: e.data.clone(), + transaction_hash: tx.hash, + from_address: e.from_address.into(), + }, + ) }) .collect::>(); + // remaining possible events that we haven't seen due to the chunk size limit. + let chunk_seen_end = cursor.event_n as usize + total_can_take; + // get the index of the last matching event that we have reached. if there is not + // matching events (ie `filtered` is empty) we point the end of the chunk + // we've covered thus far.. + let last_event_idx = filtered.last().map(|(i, _)| *i).unwrap_or(chunk_seen_end); // the next time we have to fetch the events, we will start from this index. - let new_event_n = cursor.event_n as usize + remaining_capacity; + let new_event_n = if total_can_take == 0 { + // if we haven't taken any events, due to the chunk size limit, we need to start + // from the the same event pointed by the current cursor.. + cursor.event_n as usize + last_event_idx + } else { + // start at the next event of the last event we've filtered out. + cursor.event_n as usize + last_event_idx + 1 + }; - buffer.extend(filtered_events); + buffer.extend(filtered.into_iter().map(|(_, event)| event)); // if there are still more events that we haven't fetched yet for this tx. - if new_event_n < txn_events_len { + if new_event_n < tx_events_len { cursor.event_n = new_event_n as u64; } // reset the event index