Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tx: Remove tx_broadcast transaction from the pool #4050

Merged
merged 5 commits into from
Apr 18, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,26 @@ use std::{collections::HashMap, sync::Arc};
use super::error::ErrorBroadcast;

/// An API for transaction RPC calls.
pub struct TransactionBroadcast<Pool, Client> {
pub struct TransactionBroadcast<Pool: TransactionPool, Client> {
/// Substrate client.
client: Arc<Client>,
/// Transactions pool.
pool: Arc<Pool>,
/// Executor to spawn subscriptions.
executor: SubscriptionTaskExecutor,
/// The broadcast operation IDs.
broadcast_ids: Arc<RwLock<HashMap<String, BroadcastState>>>,
broadcast_ids: Arc<RwLock<HashMap<String, BroadcastState<Pool>>>>,
}

/// The state of a broadcast operation.
struct BroadcastState {
struct BroadcastState<Pool: TransactionPool> {
/// Handle to abort the running future that broadcasts the transaction.
handle: AbortHandle,
/// Associated tx hash.
tx_hash: Option<<Pool as TransactionPool>::Hash>,
}

impl<Pool, Client> TransactionBroadcast<Pool, Client> {
impl<Pool: TransactionPool, Client> TransactionBroadcast<Pool, Client> {
/// Creates a new [`TransactionBroadcast`].
pub fn new(client: Arc<Client>, pool: Arc<Pool>, executor: SubscriptionTaskExecutor) -> Self {
TransactionBroadcast { client, pool, executor, broadcast_ids: Default::default() }
Expand Down Expand Up @@ -111,12 +113,21 @@ where
|notification| async move { notification.is_new_best.then_some(notification.hash) },
));

let broadcast_ids = self.broadcast_ids.clone();
let current_id = id.clone();
let broadcast_transaction_fut = async move {
// There is nothing we could do with an extrinsic of invalid format.
let Ok(decoded_extrinsic) = TransactionFor::<Pool>::decode(&mut &bytes[..]) else {
return;
};

// Save the tx hash to remove it later.
let tx_hash = pool.hash_of(&decoded_extrinsic);
broadcast_ids
.write()
.get_mut(&current_id)
.map(|state| state.tx_hash = Some(tx_hash));
lexnv marked this conversation as resolved.
Show resolved Hide resolved

// Flag to determine if the we should broadcast the transaction again.
let mut is_done = false;

Expand Down Expand Up @@ -169,17 +180,29 @@ where
let (fut, handle) = futures::future::abortable(broadcast_transaction_fut);
let broadcast_ids = self.broadcast_ids.clone();
let drop_id = id.clone();
let pool = self.pool.clone();
// The future expected by the executor must be `Future<Output = ()>` instead of
// `Future<Output = Result<(), Aborted>>`.
let fut = fut.map(move |_| {
let fut = fut.map(move |result| {
// Remove the entry from the broadcast IDs map.
broadcast_ids.write().remove(&drop_id);
let Some(broadcast_state) = broadcast_ids.write().remove(&drop_id) else { return };

// The broadcast was not stopped.
if result.is_ok() {
return
}

// Tx is associated with the terminated broadcast future.
let Some(tx_hash) = broadcast_state.tx_hash else { return };

// Best effort pool removal (tx can already be finalized).
pool.remove_invalid(&[tx_hash]);
});

// Keep track of this entry and the abortable handle.
{
let mut broadcast_ids = self.broadcast_ids.write();
broadcast_ids.insert(id.clone(), BroadcastState { handle });
broadcast_ids.insert(id.clone(), BroadcastState { handle, tx_hash: None });
}

sc_rpc::utils::spawn_subscription_task(&self.executor, fut);
Expand Down
Loading