Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(katana-pool): enable ordering using BTreeSet #2370

Merged
merged 4 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ katana-codecs = { path = "crates/katana/storage/codecs" }
katana-codecs-derive = { path = "crates/katana/storage/codecs/derive" }
katana-core = { path = "crates/katana/core", default-features = false }
katana-db = { path = "crates/katana/storage/db" }
katana-executor = { path = "crates/katana/executor", default-features = false }
katana-executor = { path = "crates/katana/executor" }
katana-node = { path = "crates/katana/node", default-features = false }
katana-pool = { path = "crates/katana/pool" }
katana-primitives = { path = "crates/katana/primitives" }
Expand Down
125 changes: 118 additions & 7 deletions crates/katana/pool/src/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
impl Ord for TxSubmissionNonce {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// Reverse the ordering so lower values have higher priority
other.0.cmp(&self.0)
self.0.cmp(&other.0)
}
}

Expand All @@ -76,25 +76,136 @@
/// This ordering implementation uses the transaction's tip as the priority value. We don't have a
/// use case for this ordering implementation yet, but it's mostly used for testing.
#[derive(Debug)]
pub struct Tip<T>(PhantomData<T>);
pub struct TipOrdering<T>(PhantomData<T>);

impl<T> Tip<T> {
impl<T> TipOrdering<T> {
pub fn new() -> Self {
Self(PhantomData)
}
}

impl<T: PoolTransaction> PoolOrd for Tip<T> {
#[derive(Debug, Clone)]
pub struct Tip(u64);

impl Ord for Tip {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.0.cmp(&self.0)
}
}

impl PartialOrd for Tip {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}

Check warning on line 99 in crates/katana/pool/src/ordering.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pool/src/ordering.rs#L97-L99

Added lines #L97 - L99 were not covered by tests
}

impl PartialEq for Tip {
fn eq(&self, other: &Self) -> bool {
self.0 == other.0
}

Check warning on line 105 in crates/katana/pool/src/ordering.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/pool/src/ordering.rs#L103-L105

Added lines #L103 - L105 were not covered by tests
}

impl Eq for Tip {}

impl<T: PoolTransaction> PoolOrd for TipOrdering<T> {
type Transaction = T;
type PriorityValue = u64;
type PriorityValue = Tip;

fn priority(&self, tx: &Self::Transaction) -> Self::PriorityValue {
tx.tip()
Tip(tx.tip())
}
}

impl<T> Default for Tip<T> {
impl<T> Default for TipOrdering<T> {
fn default() -> Self {
Self::new()
}
}

#[cfg(test)]
mod tests {

use crate::ordering::{self, FiFo};
use crate::pool::test_utils::*;
use crate::tx::PoolTransaction;
use crate::validation::NoopValidator;
use crate::{Pool, TransactionPool};

#[test]
fn fifo_ordering() {
// Create mock transactions
let txs = [PoolTx::new(), PoolTx::new(), PoolTx::new(), PoolTx::new(), PoolTx::new()];

// Create a pool with FiFo ordering
let pool = Pool::new(NoopValidator::new(), FiFo::new());

// Add transactions to the pool
txs.iter().for_each(|tx| {
let _ = pool.add_transaction(tx.clone());
});

// Get pending transactions
let pendings = pool.take_transactions().collect::<Vec<_>>();

// Assert that the transactions are in the order they were added (first to last)
pendings.iter().zip(txs).for_each(|(pending, tx)| {
assert_eq!(pending.tx.as_ref(), &tx);
});
}

#[test]
fn tip_based_ordering() {
// Create mock transactions with different tips and in random order
let txs = [
PoolTx::new().with_tip(2),
PoolTx::new().with_tip(1),
PoolTx::new().with_tip(6),
PoolTx::new().with_tip(3),
PoolTx::new().with_tip(2),
PoolTx::new().with_tip(2),
PoolTx::new().with_tip(5),
PoolTx::new().with_tip(4),
PoolTx::new().with_tip(7),
];

// Create a pool with tip-based ordering
let pool = Pool::new(NoopValidator::new(), ordering::TipOrdering::new());

// Add transactions to the pool
txs.iter().for_each(|tx| {
let _ = pool.add_transaction(tx.clone());
});

// Get pending transactions
let pending = pool.take_transactions().collect::<Vec<_>>();
assert_eq!(pending.len(), txs.len());

// Assert that the transactions are ordered by tip (highest to lowest)
assert_eq!(pending[0].tx.tip(), 7);
assert_eq!(pending[0].tx.hash(), txs[8].hash());

assert_eq!(pending[1].tx.tip(), 6);
assert_eq!(pending[1].tx.hash(), txs[2].hash());

assert_eq!(pending[2].tx.tip(), 5);
assert_eq!(pending[2].tx.hash(), txs[6].hash());

assert_eq!(pending[3].tx.tip(), 4);
assert_eq!(pending[3].tx.hash(), txs[7].hash());

assert_eq!(pending[4].tx.tip(), 3);
assert_eq!(pending[4].tx.hash(), txs[3].hash());

assert_eq!(pending[5].tx.tip(), 2);
assert_eq!(pending[5].tx.hash(), txs[0].hash());

assert_eq!(pending[6].tx.tip(), 2);
assert_eq!(pending[6].tx.hash(), txs[4].hash());

assert_eq!(pending[7].tx.tip(), 2);
assert_eq!(pending[7].tx.hash(), txs[5].hash());

assert_eq!(pending[8].tx.tip(), 1);
assert_eq!(pending[8].tx.hash(), txs[1].hash());
}
}
150 changes: 8 additions & 142 deletions crates/katana/pool/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use core::fmt;
use std::collections::btree_set::IntoIter;
use std::collections::BTreeSet;
use std::sync::Arc;
use std::vec::IntoIter;

use futures::channel::mpsc::{channel, Receiver, Sender};
use katana_primitives::transaction::TxHash;
Expand All @@ -24,8 +25,8 @@ where

#[derive(Debug)]
struct Inner<T, V, O: PoolOrd> {
/// List of all valid txs in the pool
transactions: RwLock<Vec<PendingTx<T, O>>>,
/// List of all valid txs in the pool.
transactions: RwLock<BTreeSet<PendingTx<T, O>>>,

/// listeners for incoming txs
listeners: RwLock<Vec<Sender<TxHash>>>,
Expand Down Expand Up @@ -108,7 +109,7 @@ where
let tx = PendingTx::new(id, tx, priority);

// insert the tx in the pool
self.inner.transactions.write().push(tx);
self.inner.transactions.write().insert(tx);
self.notify_listener(hash);

Ok(hash)
Expand Down Expand Up @@ -215,15 +216,14 @@ pub(crate) mod test_utils {

use super::*;
use crate::tx::PoolTransaction;
use crate::validation::{ValidationOutcome, ValidationResult, Validator};

fn random_bytes<const SIZE: usize>() -> [u8; SIZE] {
let mut bytes = [0u8; SIZE];
rand::thread_rng().fill(&mut bytes[..]);
bytes
}

#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PoolTx {
tip: u64,
nonce: Nonce,
Expand Down Expand Up @@ -284,36 +284,6 @@ pub(crate) mod test_utils {
self.tip
}
}

/// A tip-based validator that flags transactions as invalid if they have less than 10 tip.
pub struct TipValidator<T> {
threshold: u64,
t: std::marker::PhantomData<T>,
}

impl<T> TipValidator<T> {
pub fn new(threshold: u64) -> Self {
Self { threshold, t: std::marker::PhantomData }
}
}

impl<T: PoolTransaction> Validator for TipValidator<T> {
type Transaction = T;

fn validate(&self, tx: Self::Transaction) -> ValidationResult<Self::Transaction> {
if tx.tip() < self.threshold {
return ValidationResult::Ok(ValidationOutcome::Invalid {
error: InvalidTransactionError::InsufficientFunds {
balance: FieldElement::ONE,
max_fee: tx.max_fee(),
},
tx,
});
}

ValidationResult::Ok(ValidationOutcome::Valid(tx))
}
}
}

#[cfg(test)]
Expand All @@ -324,10 +294,9 @@ mod tests {

use super::test_utils::*;
use super::Pool;
use crate::ordering::{self, FiFo};
use crate::pool::test_utils;
use crate::ordering::FiFo;
use crate::tx::PoolTransaction;
use crate::validation::{NoopValidator, ValidationOutcome, Validator};
use crate::validation::NoopValidator;
use crate::TransactionPool;

/// Tx pool that uses a noop validator and a first-come-first-serve ordering.
Expand Down Expand Up @@ -424,109 +393,6 @@ mod tests {
assert_eq!(counter, txs.len());
}

#[test]
#[ignore = "Rejected pool not implemented yet"]
fn transactions_rejected() {
let all = [
PoolTx::new().with_tip(5),
PoolTx::new().with_tip(0),
PoolTx::new().with_tip(15),
PoolTx::new().with_tip(8),
PoolTx::new().with_tip(12),
PoolTx::new().with_tip(10),
PoolTx::new().with_tip(1),
];

// create a pool with a validator that rejects txs with tip < 10
let pool = Pool::new(test_utils::TipValidator::new(10), FiFo::new());

// Extract the expected valid and invalid transactions from the all list
let (expected_valids, expected_invalids) = pool
.validator()
.validate_all(all.to_vec())
.into_iter()
.filter_map(|res| res.ok())
.fold((Vec::new(), Vec::new()), |mut acc, res| match res {
ValidationOutcome::Valid(tx) => {
acc.0.push(tx);
acc
}

ValidationOutcome::Invalid { tx, .. } => {
acc.1.push(tx);
acc
}

ValidationOutcome::Dependent { tx, .. } => {
acc.0.push(tx);
acc
}
});

assert_eq!(expected_valids.len(), 3);
assert_eq!(expected_invalids.len(), 4);

// Add all transactions to the pool
all.iter().for_each(|tx| {
let _ = pool.add_transaction(tx.clone());
});

// Check that all transactions should be in the pool regardless of validity
assert!(all.iter().all(|tx| pool.get(tx.hash()).is_some()));
assert_eq!(pool.size(), all.len());

// Pending transactions should only contain the valid transactions
let pendings = pool.take_transactions().collect::<Vec<_>>();
assert_eq!(pendings.len(), expected_valids.len());

// bcs its a fcfs pool, the order of the pending txs should be the as its order of insertion
// (position in the array)
for (actual, expected) in pendings.iter().zip(expected_valids.iter()) {
assert_eq!(actual.tx.hash(), expected.hash());
}

// // rejected_txs should contain all the invalid txs
// assert_eq!(pool.inner.rejected.read().len(), expected_invalids.len());
// for tx in expected_invalids.iter() {
// assert!(pool.inner.rejected.read().contains_key(&tx.hash()));
// }
}

#[test]
#[ignore = "Txs ordering not fully implemented yet"]
fn txs_ordering() {
// Create mock transactions with different tips and in random order
let txs = [
PoolTx::new().with_tip(1),
PoolTx::new().with_tip(6),
PoolTx::new().with_tip(3),
PoolTx::new().with_tip(2),
PoolTx::new().with_tip(5),
PoolTx::new().with_tip(4),
PoolTx::new().with_tip(7),
];

// Create a pool with tip-based ordering
let pool = Pool::new(NoopValidator::new(), ordering::Tip::new());

// Add transactions to the pool
txs.iter().for_each(|tx| {
let _ = pool.add_transaction(tx.clone());
});

// Get pending transactions
let pending = pool.take_transactions().collect::<Vec<_>>();

// Assert that the transactions are ordered by tip (highest to lowest)
assert_eq!(pending[0].tx.tip(), 7);
assert_eq!(pending[1].tx.tip(), 6);
assert_eq!(pending[2].tx.tip(), 5);
assert_eq!(pending[3].tx.tip(), 4);
assert_eq!(pending[4].tx.tip(), 3);
assert_eq!(pending[5].tx.tip(), 2);
assert_eq!(pending[6].tx.tip(), 1);
}

#[test]
#[ignore = "Txs dependency management not fully implemented yet"]
fn dependent_txs_linear_insertion() {
Expand Down
10 changes: 9 additions & 1 deletion crates/katana/pool/src/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,17 @@ impl<T, O: PoolOrd> PartialOrd for PendingTx<T, O> {
}
}

// When two transactions have the same priority, we want to prioritize the one that was added
// first. So, when an incoming transaction with similar priority value is added to the
// [BTreeSet](std::collections::BTreeSet), the transaction is assigned a 'greater'
// [Ordering](std::cmp::Ordering) so that it will be placed after the existing ones. This is
// because items in a BTree is ordered from lowest to highest.
impl<T, O: PoolOrd> Ord for PendingTx<T, O> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.priority.cmp(&other.priority)
match self.priority.cmp(&other.priority) {
std::cmp::Ordering::Equal => std::cmp::Ordering::Greater,
other => other,
}
}
}

Expand Down
Loading