Skip to content

Commit

Permalink
DOSE-122 policy for ingesting writes into zettacache (openzfs#457)
Browse files Browse the repository at this point in the history
  • Loading branch information
Prakash Surya authored Oct 11, 2021
1 parent cae1704 commit 87f8c33
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 18 deletions.
1 change: 1 addition & 0 deletions cmd/zfs_object_agent/zettacache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod space_map;
mod zcachedb;
mod zettacache;

pub use crate::zettacache::InsertSource;
pub use crate::zettacache::LookupResponse;
pub use crate::zettacache::ZettaCache;
pub use zcachedb::DumpStructuresOptions;
Expand Down
59 changes: 46 additions & 13 deletions cmd/zfs_object_agent/zettacache/src/zettacache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ lazy_static! {
static ref TARGET_CACHE_SIZE_PCT: u64 = get_tunable("target_cache_size_pct", 80);
static ref HIGH_WATER_CACHE_SIZE_PCT: u64 = get_tunable("high_water_cache_size_pct", 82);

// number of insertions that can be buffered before we start dropping them; around a second's worth
static ref CACHE_INSERT_MAX_BUFFER: usize = get_tunable("cache_insert_max_buffer", 10_000);
static ref CACHE_INSERT_BLOCKING_BUFFER_BYTES: usize = get_tunable("cache_insert_blocking_buffer_bytes", 256_000_000);
static ref CACHE_INSERT_NONBLOCKING_BUFFER_BYTES: usize = get_tunable("cache_insert_nonblocking_buffer_bytes", 256_000_000);
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -132,7 +132,8 @@ pub struct ZettaCache {
state: Arc<tokio::sync::Mutex<ZettaCacheState>>,
outstanding_lookups: LockSet<IndexKey>,
metrics: Arc<ZettaCacheMetrics>,
outstanding_inserts: Arc<Semaphore>,
blocking_buffer_bytes_available: Arc<Semaphore>,
nonblocking_buffer_bytes_available: Arc<Semaphore>,
}

#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
Expand Down Expand Up @@ -522,6 +523,13 @@ pub enum LookupResponse {
Absent(LockedKey),
}

pub enum InsertSource {
Heal,
Read,
SpeculativeRead,
Write,
}

#[metered(registry=ZettaCacheMetrics)]
impl ZettaCache {
pub async fn create(path: &str) {
Expand Down Expand Up @@ -670,7 +678,12 @@ impl ZettaCache {
state: Arc::new(tokio::sync::Mutex::new(state)),
outstanding_lookups: LockSet::new(),
metrics: Default::default(),
outstanding_inserts: Arc::new(Semaphore::new(*CACHE_INSERT_MAX_BUFFER)),
blocking_buffer_bytes_available: Arc::new(Semaphore::new(
*CACHE_INSERT_BLOCKING_BUFFER_BYTES,
)),
nonblocking_buffer_bytes_available: Arc::new(Semaphore::new(
*CACHE_INSERT_NONBLOCKING_BUFFER_BYTES,
)),
};

let (merge_rx, merge_index) = match checkpoint.merge_progress {
Expand Down Expand Up @@ -1033,17 +1046,36 @@ impl ZettaCache {
#[measure(InFlight)]
#[measure(Throughput)]
#[measure(HitCount)]
pub fn insert(&self, locked_key: LockedKey, buf: Vec<u8>) {
pub async fn insert(&self, locked_key: LockedKey, buf: Vec<u8>, source: InsertSource) {
// The passed in buffer is only for a single block, which is capped to SPA_MAXBLOCKSIZE,
// and thus we should never have an issue converting the length to a "u32" here.
let bytes = u32::try_from(buf.len()).unwrap();

// This permit will be dropped when the write to disk completes. It
// serves to limit the number of insert()'s that we can buffer before
// dropping (ignoring) insertion requests.
let insert_permit = match self.outstanding_inserts.clone().try_acquire_owned() {
Ok(permit) => permit,
Err(tokio::sync::TryAcquireError::NoPermits) => {
self.insert_failed_max_queue_depth(locked_key.0.value());
return;
}
Err(e) => panic!("unexpected error from try_acquire: {:?}", e),
let insert_permit = match source {
InsertSource::Heal | InsertSource::SpeculativeRead | InsertSource::Write => match self
.nonblocking_buffer_bytes_available
.clone()
.try_acquire_many_owned(bytes)
{
Ok(permit) => permit,
Err(tokio::sync::TryAcquireError::NoPermits) => {
self.insert_failed_max_queue_depth(locked_key.0.value());
return;
}
Err(e) => panic!("unexpected error from try_acquire_many_owned: {:?}", e),
},
InsertSource::Read => match self
.blocking_buffer_bytes_available
.clone()
.acquire_many_owned(bytes)
.await
{
Ok(permit) => permit,
Err(e) => panic!("unexpected error from acquire_many_owned: {:?}", e),
},
};

let block_access = self.block_access.clone();
Expand Down Expand Up @@ -1108,7 +1140,8 @@ impl ZettaCache {
state.remove_from_index(*locked_key.0.value(), value);
state.block_allocator.free(value.extent());
}
self.insert(locked_key, buf.to_owned());
self.insert(locked_key, buf.to_owned(), InsertSource::Heal)
.await;
}
}
}
Expand Down
47 changes: 42 additions & 5 deletions cmd/zfs_object_agent/zettaobject/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use anyhow::Error;
use anyhow::{Context, Result};
use conv::ConvUtil;
use futures::future;
use futures::future::join;
use futures::future::Either;
use futures::future::Future;
use futures::future::{join3, join5};
Expand Down Expand Up @@ -55,6 +56,7 @@ use util::maybe_die_with;
use util::TerseVec;
use uuid::Uuid;
use zettacache::base_types::*;
use zettacache::InsertSource;
use zettacache::LookupResponse;
use zettacache::ZettaCache;

Expand Down Expand Up @@ -1570,13 +1572,13 @@ impl Pool {
block
);
}
LookupResponse::Absent(key) => cache.insert(key, data2),
LookupResponse::Absent(key) => cache.insert(key, data2, InsertSource::Write).await,
}
}
receiver.await.unwrap();
}

async fn read_block_impl(&self, block: BlockId, bypass_cache: bool) -> Vec<u8> {
async fn read_object_for_block(&self, block: BlockId, bypass_cache: bool) -> DataObjectPhys {
// If we are in the middle of resuming, wait for that to complete before
// processing this read. This is needed because we may be reading from
// a block that hasn't yet been added to the ObjectBlockMap.
Expand All @@ -1601,6 +1603,12 @@ impl Pool {
.unwrap();
// XXX consider using debug_assert_eq
assert_eq!(phys.blocks_size, phys.calculate_blocks_size());

phys
}

async fn read_block_impl(&self, block: BlockId, bypass_cache: bool) -> Vec<u8> {
let phys = self.read_object_for_block(block, bypass_cache).await;
// XXX to_owned() copies the data; would be nice to return a reference
phys.get_block(block).to_owned()
}
Expand All @@ -1618,9 +1626,38 @@ impl Pool {
false => match cache.lookup(self.state.shared_state.guid, block).await {
LookupResponse::Present((cached_vec, _key, _value)) => cached_vec,
LookupResponse::Absent(key) => {
let vec = self.read_block_impl(block, heal).await;
// XXX clone() copies the data; would be nice to pass a reference
cache.insert(key, vec.clone());
let mut phys = self.read_object_for_block(block, heal).await;

let vec = phys.blocks.remove(&block).unwrap().into_vec();

join(
async {
// XXX Unfortunately, we must copy `vec` so that we can give it to `cache.insert`, while also returning it.
cache.insert(key, vec.clone(), InsertSource::Read).await;
},
async {
phys.blocks
.into_iter()
.map(|(b, buf)| async move {
if let LookupResponse::Absent(key) =
cache.lookup(self.state.shared_state.guid, b).await
{
cache
.insert(
key,
buf.into_vec(),
InsertSource::SpeculativeRead,
)
.await;
}
})
.collect::<FuturesUnordered<_>>()
.for_each(|_| async move {})
.await;
},
)
.await;

vec
}
},
Expand Down

0 comments on commit 87f8c33

Please sign in to comment.