diff --git a/Cargo.lock b/Cargo.lock index 69395bf281e8..59891f7993a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17404,6 +17404,7 @@ dependencies = [ "sc-transaction-pool", "sc-transaction-pool-api", "sc-utils", + "schnellru", "serde", "serde_json", "sp-api", diff --git a/substrate/client/rpc-spec-v2/Cargo.toml b/substrate/client/rpc-spec-v2/Cargo.toml index e2612d914542..f2fc7bee6e20 100644 --- a/substrate/client/rpc-spec-v2/Cargo.toml +++ b/substrate/client/rpc-spec-v2/Cargo.toml @@ -42,6 +42,7 @@ array-bytes = "6.1" log = { workspace = true, default-features = true } futures-util = { version = "0.3.30", default-features = false } rand = "0.8.5" +schnellru = "0.2.1" [dev-dependencies] jsonrpsee = { version = "0.22", features = ["server", "ws-client"] } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs index 86d9a726d7be..6779180a4146 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -75,7 +75,7 @@ pub struct ChainHeadConfig { /// Maximum pinned blocks across all connections. /// This number is large enough to consider immediate blocks. /// Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db. -const MAX_PINNED_BLOCKS: usize = 512; +pub(crate) const MAX_PINNED_BLOCKS: usize = 512; /// Any block of any subscription should not be pinned more than /// this constant. When a subscription contains a block older than this, diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs index 0d87a45c07e2..a753896b24c2 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs @@ -19,7 +19,7 @@ //! Implementation of the `chainHead_follow` method. use crate::chain_head::{ - chain_head::LOG_TARGET, + chain_head::{LOG_TARGET, MAX_PINNED_BLOCKS}, event::{ BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent, RuntimeVersionEvent, @@ -37,6 +37,7 @@ use sc_client_api::{ Backend, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification, }; use sc_rpc::utils::to_sub_message; +use schnellru::{ByLength, LruMap}; use sp_api::CallApiAt; use sp_blockchain::{ Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info, @@ -68,7 +69,9 @@ pub struct ChainHeadFollower, Block: BlockT, Client> { /// Subscription ID. sub_id: String, /// The best reported block by this subscription. - best_block_cache: Option, + current_best_block: Option, + /// LRU cache of pruned blocks. + pruned_blocks: LruMap, /// Stop all subscriptions if the distance between the leaves and the current finalized /// block is larger than this value. max_lagging_distance: usize, @@ -90,7 +93,10 @@ impl, Block: BlockT, Client> ChainHeadFollower, - ) -> Result<(Vec>, HashSet), SubscriptionManagementError> - { + ) -> Result>, SubscriptionManagementError> { let init = self.get_init_blocks_with_forks(startup_point.finalized_hash)?; // The initialized event is the first one sent. let initial_blocks = init.finalized_block_descendants; let finalized_block_hashes = init.finalized_block_hashes; + // These are the pruned blocks that we should not report again. + for pruned in init.pruned_forks { + self.pruned_blocks.insert(pruned, ()); + } let finalized_block_hash = startup_point.finalized_hash; let finalized_block_runtime = self.generate_runtime_event(finalized_block_hash, None); @@ -345,11 +353,11 @@ where let best_block_hash = startup_point.best_hash; if best_block_hash != finalized_block_hash { let best_block = FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash }); - self.best_block_cache = Some(best_block_hash); + self.current_best_block = Some(best_block_hash); finalized_block_descendants.push(best_block); }; - Ok((finalized_block_descendants, init.pruned_forks)) + Ok(finalized_block_descendants) } /// Generate the "NewBlock" event and potentially the "BestBlockChanged" event for the @@ -377,19 +385,19 @@ where let best_block_event = FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash: block_hash }); - match self.best_block_cache { + match self.current_best_block { Some(block_cache) => { // The RPC layer has not reported this block as best before. // Note: This handles the race with the finalized branch. if block_cache != block_hash { - self.best_block_cache = Some(block_hash); + self.current_best_block = Some(block_hash); vec![new_block, best_block_event] } else { vec![new_block] } }, None => { - self.best_block_cache = Some(block_hash); + self.current_best_block = Some(block_hash); vec![new_block, best_block_event] }, } @@ -458,7 +466,7 @@ where // When the node falls out of sync and then syncs up to the tip of the chain, it can // happen that we skip notifications. Then it is better to terminate the connection // instead of trying to send notifications for all missed blocks. - if let Some(best_block_hash) = self.best_block_cache { + if let Some(best_block_hash) = self.current_best_block { let ancestor = sp_blockchain::lowest_common_ancestor( &*self.client, *hash, @@ -481,13 +489,10 @@ where } /// Get all pruned block hashes from the provided stale heads. - /// - /// The result does not include hashes from `to_ignore`. fn get_pruned_hashes( - &self, + &mut self, stale_heads: &[Block::Hash], last_finalized: Block::Hash, - to_ignore: &mut HashSet, ) -> Result, SubscriptionManagementError> { let blockchain = self.backend.blockchain(); let mut pruned = Vec::new(); @@ -497,11 +502,13 @@ where // Collect only blocks that are not part of the canonical chain. pruned.extend(tree_route.enacted().iter().filter_map(|block| { - if !to_ignore.remove(&block.hash) { - Some(block.hash) - } else { - None + if self.pruned_blocks.get(&block.hash).is_some() { + // The block was already reported as pruned. + return None } + + self.pruned_blocks.insert(block.hash, ()); + Some(block.hash) })) } @@ -515,7 +522,6 @@ where fn handle_finalized_blocks( &mut self, notification: FinalityNotification, - to_ignore: &mut HashSet, startup_point: &StartupPoint, ) -> Result>, SubscriptionManagementError> { let last_finalized = notification.hash; @@ -536,25 +542,32 @@ where // Report all pruned blocks from the notification that are not // part of the fork we need to ignore. let pruned_block_hashes = - self.get_pruned_hashes(¬ification.stale_heads, last_finalized, to_ignore)?; + self.get_pruned_hashes(¬ification.stale_heads, last_finalized)?; let finalized_event = FollowEvent::Finalized(Finalized { finalized_block_hashes, pruned_block_hashes: pruned_block_hashes.clone(), }); - match self.best_block_cache { - Some(block_cache) => { - // If the best block wasn't pruned, we are done here. - if !pruned_block_hashes.iter().any(|hash| *hash == block_cache) { - events.push(finalized_event); - return Ok(events) - } - - // The best block is reported as pruned. Therefore, we need to signal a new - // best block event before submitting the finalized event. + if let Some(current_best_block) = self.current_best_block { + // The best reported block is in the pruned list. Report a new best block. + let is_in_pruned_list = + pruned_block_hashes.iter().any(|hash| *hash == current_best_block); + // The block is not the last finalized block. + // + // It can be either: + // - a descendant of the last finalized block + // - a block on a fork that will be pruned in the future. + // + // In those cases, we emit a new best block. + let is_not_last_finalized = current_best_block != last_finalized; + + if is_in_pruned_list || is_not_last_finalized { + // We need to generate a best block event. let best_block_hash = self.client.info().best_hash; - if best_block_hash == block_cache { + + // Defensive check against state missmatch. + if best_block_hash == current_best_block { // The client doest not have any new information about the best block. // The information from `.info()` is updated from the DB as the last // step of the finalization and it should be up to date. @@ -564,23 +577,18 @@ where "[follow][id={:?}] Client does not contain different best block", self.sub_id, ); - events.push(finalized_event); - Ok(events) } else { // The RPC needs to also submit a new best block changed before the // finalized event. - self.best_block_cache = Some(best_block_hash); - let best_block_event = - FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash }); - events.extend([best_block_event, finalized_event]); - Ok(events) + self.current_best_block = Some(best_block_hash); + events + .push(FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash })); } - }, - None => { - events.push(finalized_event); - Ok(events) - }, + } } + + events.push(finalized_event); + Ok(events) } /// Submit the events from the provided stream to the RPC client @@ -589,7 +597,6 @@ where &mut self, startup_point: &StartupPoint, mut stream: EventStream, - mut to_ignore: HashSet, sink: SubscriptionSink, rx_stop: oneshot::Receiver<()>, ) -> Result<(), SubscriptionManagementError> @@ -612,7 +619,7 @@ where NotificationType::NewBlock(notification) => self.handle_import_blocks(notification, &startup_point), NotificationType::Finalized(notification) => - self.handle_finalized_blocks(notification, &mut to_ignore, &startup_point), + self.handle_finalized_blocks(notification, &startup_point), NotificationType::MethodResponse(notification) => Ok(vec![notification]), }; @@ -682,7 +689,7 @@ where .map(|response| NotificationType::MethodResponse(response)); let startup_point = StartupPoint::from(self.client.info()); - let (initial_events, pruned_forks) = match self.generate_init_events(&startup_point) { + let initial_events = match self.generate_init_events(&startup_point) { Ok(blocks) => blocks, Err(err) => { debug!( @@ -702,7 +709,6 @@ where let merged = tokio_stream::StreamExt::merge(merged, stream_responses); let stream = stream::once(futures::future::ready(initial)).chain(merged); - self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, sub_data.rx_stop) - .await + self.submit_events(&startup_point, stream.boxed(), sink, sub_data.rx_stop).await } } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs index c2bff7c50d5e..14f664858a0d 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs @@ -187,6 +187,62 @@ async fn setup_api() -> ( (client, api, sub, sub_id, block) } +async fn import_block( + mut client: Arc>, + parent_hash: ::Hash, + parent_number: u64, +) -> Block { + let block = BlockBuilderBuilder::new(&*client) + .on_parent_block(parent_hash) + .with_parent_block_number(parent_number) + .build() + .unwrap() + .build() + .unwrap() + .block; + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + block +} + +async fn import_best_block_with_tx( + mut client: Arc>, + parent_hash: ::Hash, + parent_number: u64, + tx: Transfer, +) -> Block { + let mut block_builder = BlockBuilderBuilder::new(&*client) + .on_parent_block(parent_hash) + .with_parent_block_number(parent_number) + .build() + .unwrap(); + block_builder.push_transfer(tx).unwrap(); + let block = block_builder.build().unwrap().block; + client.import_as_best(BlockOrigin::Own, block.clone()).await.unwrap(); + block +} + +/// Check the subscription produces a new block and a best block event. +/// +/// The macro is used instead of a fn to preserve the lines of code in case of panics. +macro_rules! check_new_and_best_block_events { + ($sub:expr, $block_hash:expr, $parent_hash:expr) => { + let event: FollowEvent = get_next_event($sub).await; + let expected = FollowEvent::NewBlock(NewBlock { + block_hash: format!("{:?}", $block_hash), + parent_block_hash: format!("{:?}", $parent_hash), + new_runtime: None, + with_runtime: false, + }); + assert_eq!(event, expected); + + let event: FollowEvent = get_next_event($sub).await; + let expected = FollowEvent::BestBlockChanged(BestBlockChanged { + best_block_hash: format!("{:?}", $block_hash), + }); + assert_eq!(event, expected); + }; +} + #[tokio::test] async fn follow_subscription_produces_blocks() { let builder = TestClientBuilder::new(); @@ -3644,3 +3700,160 @@ async fn chain_head_limit_reached() { // Initialized must always be reported first. let _event: FollowEvent = get_next_event(&mut sub).await; } + +#[tokio::test] +async fn follow_unique_pruned_blocks() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let client = Arc::new(builder.build()); + + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + max_lagging_distance: MAX_LAGGING_DISTANCE, + }, + ) + .into_rpc(); + + let finalized_hash = client.info().finalized_hash; + let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [false]).await.unwrap(); + + // Initialized must always be reported first. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::Initialized(Initialized { + finalized_block_hashes: vec![format!("{:?}", finalized_hash)], + finalized_block_runtime: None, + with_runtime: false, + }); + assert_eq!(event, expected); + + // Block tree: + // + // finalized -> block 1 -> block 2 -> block 3 + // + // -> block 2 -> block 4 -> block 5 + // + // -> block 1 -> block 2_f -> block 6 + // ^^^ finalized + // -> block 7 + // ^^^ finalized + // -> block 8 + // ^^^ finalized + // The chainHead will see block 5 as the best block. However, the + // client will finalize the block 6, which is on another fork. + // + // When the block 6 is finalized, block 2 block 3 block 4 and block 5 are placed on an invalid + // fork. However, pruning of blocks happens on level N - 1. + // Therefore, no pruned blocks are reported yet. + // + // When the block 7 is finalized, block 3 is detected as stale. At this step, block 2 and 3 + // are reported as pruned. + // + // When the block 8 is finalized, block 5 block 4 and block 2 are detected as stale. However, + // only blocks 5 and 4 are reported as pruned. This is because the block 2 was previously + // reported. + + // Initial setup steps: + let block_1_hash = + import_block(client.clone(), client.chain_info().genesis_hash, 0).await.hash(); + let block_2_f_hash = import_block(client.clone(), block_1_hash, 1).await.hash(); + let block_6_hash = import_block(client.clone(), block_2_f_hash, 2).await.hash(); + // Import block 2 as best on the fork. + let mut tx_alice_ferdie = Transfer { + from: AccountKeyring::Alice.into(), + to: AccountKeyring::Ferdie.into(), + amount: 41, + nonce: 0, + }; + let block_2_hash = + import_best_block_with_tx(client.clone(), block_1_hash, 1, tx_alice_ferdie.clone()) + .await + .hash(); + + let block_3_hash = import_block(client.clone(), block_2_hash, 2).await.hash(); + // Fork block 4. + tx_alice_ferdie.nonce = 1; + let block_4_hash = import_best_block_with_tx(client.clone(), block_2_hash, 2, tx_alice_ferdie) + .await + .hash(); + let block_5_hash = import_block(client.clone(), block_4_hash, 3).await.hash(); + + // Check expected events generated by the setup. + { + // Check block 1 -> block 2f -> block 6. + check_new_and_best_block_events!(&mut sub, block_1_hash, finalized_hash); + check_new_and_best_block_events!(&mut sub, block_2_f_hash, block_1_hash); + check_new_and_best_block_events!(&mut sub, block_6_hash, block_2_f_hash); + + // Check (block 1 ->) block 2 -> block 3. + check_new_and_best_block_events!(&mut sub, block_2_hash, block_1_hash); + check_new_and_best_block_events!(&mut sub, block_3_hash, block_2_hash); + + // Check (block 1 -> block 2 ->) block 4 -> block 5. + check_new_and_best_block_events!(&mut sub, block_4_hash, block_2_hash); + check_new_and_best_block_events!(&mut sub, block_5_hash, block_4_hash); + } + + // Finalize the block 6 from the fork. + client.finalize_block(block_6_hash, None).unwrap(); + + // Expect to report the best block changed before the finalized event. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::BestBlockChanged(BestBlockChanged { + best_block_hash: format!("{:?}", block_6_hash), + }); + assert_eq!(event, expected); + + // Block 2 must be reported as pruned, even if it was the previous best. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::Finalized(Finalized { + finalized_block_hashes: vec![ + format!("{:?}", block_1_hash), + format!("{:?}", block_2_f_hash), + format!("{:?}", block_6_hash), + ], + pruned_block_hashes: vec![], + }); + assert_eq!(event, expected); + + // Pruned hash can be unpinned. + let sub_id = sub.subscription_id(); + let sub_id = serde_json::to_string(&sub_id).unwrap(); + let hash = format!("{:?}", block_2_hash); + let _res: () = api.call("chainHead_unstable_unpin", rpc_params![&sub_id, &hash]).await.unwrap(); + + // Import block 7 and check it. + let block_7_hash = import_block(client.clone(), block_6_hash, 3).await.hash(); + check_new_and_best_block_events!(&mut sub, block_7_hash, block_6_hash); + + // Finalize the block 7. + client.finalize_block(block_7_hash, None).unwrap(); + + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::Finalized(Finalized { + finalized_block_hashes: vec![format!("{:?}", block_7_hash)], + pruned_block_hashes: vec![format!("{:?}", block_2_hash), format!("{:?}", block_3_hash)], + }); + assert_eq!(event, expected); + + // Check block 8. + let block_8_hash = import_block(client.clone(), block_7_hash, 4).await.hash(); + check_new_and_best_block_events!(&mut sub, block_8_hash, block_7_hash); + + // Finalize the block 8. + client.finalize_block(block_8_hash, None).unwrap(); + + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::Finalized(Finalized { + finalized_block_hashes: vec![format!("{:?}", block_8_hash)], + pruned_block_hashes: vec![format!("{:?}", block_4_hash), format!("{:?}", block_5_hash)], + }); + assert_eq!(event, expected); +}