diff --git a/cmd/zfs_object_agent/zettacache/src/lib.rs b/cmd/zfs_object_agent/zettacache/src/lib.rs index fd7320a24931..7d89ed132d66 100644 --- a/cmd/zfs_object_agent/zettacache/src/lib.rs +++ b/cmd/zfs_object_agent/zettacache/src/lib.rs @@ -13,6 +13,7 @@ mod space_map; mod zcachedb; mod zettacache; +pub use crate::zettacache::InsertSource; pub use crate::zettacache::LookupResponse; pub use crate::zettacache::ZettaCache; pub use zcachedb::DumpStructuresOptions; diff --git a/cmd/zfs_object_agent/zettacache/src/zettacache.rs b/cmd/zfs_object_agent/zettacache/src/zettacache.rs index 1e549286e874..6205719a199c 100644 --- a/cmd/zfs_object_agent/zettacache/src/zettacache.rs +++ b/cmd/zfs_object_agent/zettacache/src/zettacache.rs @@ -51,8 +51,8 @@ lazy_static! { static ref TARGET_CACHE_SIZE_PCT: u64 = get_tunable("target_cache_size_pct", 80); static ref HIGH_WATER_CACHE_SIZE_PCT: u64 = get_tunable("high_water_cache_size_pct", 82); - // number of insertions that can be buffered before we start dropping them; around a second's worth - static ref CACHE_INSERT_MAX_BUFFER: usize = get_tunable("cache_insert_max_buffer", 10_000); + static ref CACHE_INSERT_BLOCKING_BUFFER_BYTES: usize = get_tunable("cache_insert_blocking_buffer_bytes", 256_000_000); + static ref CACHE_INSERT_NONBLOCKING_BUFFER_BYTES: usize = get_tunable("cache_insert_nonblocking_buffer_bytes", 256_000_000); } #[derive(Serialize, Deserialize, Debug)] @@ -132,7 +132,8 @@ pub struct ZettaCache { state: Arc>, outstanding_lookups: LockSet, metrics: Arc, - outstanding_inserts: Arc, + blocking_buffer_bytes_available: Arc, + nonblocking_buffer_bytes_available: Arc, } #[derive(Debug, Serialize, Deserialize, Copy, Clone)] @@ -522,6 +523,13 @@ pub enum LookupResponse { Absent(LockedKey), } +pub enum InsertSource { + Heal, + Read, + SpeculativeRead, + Write, +} + #[metered(registry=ZettaCacheMetrics)] impl ZettaCache { pub async fn create(path: &str) { @@ -670,7 +678,12 @@ impl ZettaCache { state: Arc::new(tokio::sync::Mutex::new(state)), outstanding_lookups: LockSet::new(), metrics: Default::default(), - outstanding_inserts: Arc::new(Semaphore::new(*CACHE_INSERT_MAX_BUFFER)), + blocking_buffer_bytes_available: Arc::new(Semaphore::new( + *CACHE_INSERT_BLOCKING_BUFFER_BYTES, + )), + nonblocking_buffer_bytes_available: Arc::new(Semaphore::new( + *CACHE_INSERT_NONBLOCKING_BUFFER_BYTES, + )), }; let (merge_rx, merge_index) = match checkpoint.merge_progress { @@ -1033,17 +1046,36 @@ impl ZettaCache { #[measure(InFlight)] #[measure(Throughput)] #[measure(HitCount)] - pub fn insert(&self, locked_key: LockedKey, buf: Vec) { + pub async fn insert(&self, locked_key: LockedKey, buf: Vec, source: InsertSource) { + // The passed in buffer is only for a single block, which is capped to SPA_MAXBLOCKSIZE, + // and thus we should never have an issue converting the length to a "u32" here. + let bytes = u32::try_from(buf.len()).unwrap(); + // This permit will be dropped when the write to disk completes. It // serves to limit the number of insert()'s that we can buffer before // dropping (ignoring) insertion requests. - let insert_permit = match self.outstanding_inserts.clone().try_acquire_owned() { - Ok(permit) => permit, - Err(tokio::sync::TryAcquireError::NoPermits) => { - self.insert_failed_max_queue_depth(locked_key.0.value()); - return; - } - Err(e) => panic!("unexpected error from try_acquire: {:?}", e), + let insert_permit = match source { + InsertSource::Heal | InsertSource::SpeculativeRead | InsertSource::Write => match self + .nonblocking_buffer_bytes_available + .clone() + .try_acquire_many_owned(bytes) + { + Ok(permit) => permit, + Err(tokio::sync::TryAcquireError::NoPermits) => { + self.insert_failed_max_queue_depth(locked_key.0.value()); + return; + } + Err(e) => panic!("unexpected error from try_acquire_many_owned: {:?}", e), + }, + InsertSource::Read => match self + .blocking_buffer_bytes_available + .clone() + .acquire_many_owned(bytes) + .await + { + Ok(permit) => permit, + Err(e) => panic!("unexpected error from acquire_many_owned: {:?}", e), + }, }; let block_access = self.block_access.clone(); @@ -1108,7 +1140,8 @@ impl ZettaCache { state.remove_from_index(*locked_key.0.value(), value); state.block_allocator.free(value.extent()); } - self.insert(locked_key, buf.to_owned()); + self.insert(locked_key, buf.to_owned(), InsertSource::Heal) + .await; } } } diff --git a/cmd/zfs_object_agent/zettaobject/src/pool.rs b/cmd/zfs_object_agent/zettaobject/src/pool.rs index 9703d2304a34..3ff5d480a829 100644 --- a/cmd/zfs_object_agent/zettaobject/src/pool.rs +++ b/cmd/zfs_object_agent/zettaobject/src/pool.rs @@ -19,6 +19,7 @@ use anyhow::Error; use anyhow::{Context, Result}; use conv::ConvUtil; use futures::future; +use futures::future::join; use futures::future::Either; use futures::future::Future; use futures::future::{join3, join5}; @@ -55,6 +56,7 @@ use util::maybe_die_with; use util::TerseVec; use uuid::Uuid; use zettacache::base_types::*; +use zettacache::InsertSource; use zettacache::LookupResponse; use zettacache::ZettaCache; @@ -1570,13 +1572,13 @@ impl Pool { block ); } - LookupResponse::Absent(key) => cache.insert(key, data2), + LookupResponse::Absent(key) => cache.insert(key, data2, InsertSource::Write).await, } } receiver.await.unwrap(); } - async fn read_block_impl(&self, block: BlockId, bypass_cache: bool) -> Vec { + async fn read_object_for_block(&self, block: BlockId, bypass_cache: bool) -> DataObjectPhys { // If we are in the middle of resuming, wait for that to complete before // processing this read. This is needed because we may be reading from // a block that hasn't yet been added to the ObjectBlockMap. @@ -1601,6 +1603,12 @@ impl Pool { .unwrap(); // XXX consider using debug_assert_eq assert_eq!(phys.blocks_size, phys.calculate_blocks_size()); + + phys + } + + async fn read_block_impl(&self, block: BlockId, bypass_cache: bool) -> Vec { + let phys = self.read_object_for_block(block, bypass_cache).await; // XXX to_owned() copies the data; would be nice to return a reference phys.get_block(block).to_owned() } @@ -1618,9 +1626,38 @@ impl Pool { false => match cache.lookup(self.state.shared_state.guid, block).await { LookupResponse::Present((cached_vec, _key, _value)) => cached_vec, LookupResponse::Absent(key) => { - let vec = self.read_block_impl(block, heal).await; - // XXX clone() copies the data; would be nice to pass a reference - cache.insert(key, vec.clone()); + let mut phys = self.read_object_for_block(block, heal).await; + + let vec = phys.blocks.remove(&block).unwrap().into_vec(); + + join( + async { + // XXX Unfortunately, we must copy `vec` so that we can give it to `cache.insert`, while also returning it. + cache.insert(key, vec.clone(), InsertSource::Read).await; + }, + async { + phys.blocks + .into_iter() + .map(|(b, buf)| async move { + if let LookupResponse::Absent(key) = + cache.lookup(self.state.shared_state.guid, b).await + { + cache + .insert( + key, + buf.into_vec(), + InsertSource::SpeculativeRead, + ) + .await; + } + }) + .collect::>() + .for_each(|_| async move {}) + .await; + }, + ) + .await; + vec } },