diff --git a/cmd/zfs_object_agent/util/src/message.rs b/cmd/zfs_object_agent/util/src/message.rs index 972bfe87dcb0..df1e7c6e9b9b 100644 --- a/cmd/zfs_object_agent/util/src/message.rs +++ b/cmd/zfs_object_agent/util/src/message.rs @@ -137,6 +137,7 @@ pub const TYPE_LIST_DEVICES: &str = "list devices"; pub const TYPE_ZCACHE_IOSTAT: &str = "zcache iostat"; pub const TYPE_ZCACHE_STATS: &str = "zcache stats"; pub const TYPE_ADD_DISK: &str = "add disk"; +pub const TYPE_EXPAND_DISK: &str = "expand disk"; pub const TYPE_SYNC_CHECKPOINT: &str = "sync checkpoint"; pub const TYPE_INITIATE_MERGE: &str = "initiate merge"; @@ -145,5 +146,16 @@ pub struct AddDiskRequest { pub path: PathBuf, } +#[derive(Serialize, Deserialize, Debug)] +pub struct ExpandDiskRequest { + pub path: PathBuf, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct ExpandDiskResponse { + pub new_size: u64, + pub additional_bytes: u64, +} + // We assume that a single write of this size is atomic. pub const SUPERBLOCK_SIZE: usize = 4 * 1024; diff --git a/cmd/zfs_object_agent/zcache/src/expand.rs b/cmd/zfs_object_agent/zcache/src/expand.rs new file mode 100644 index 000000000000..87720fdb0d1b --- /dev/null +++ b/cmd/zfs_object_agent/zcache/src/expand.rs @@ -0,0 +1,52 @@ +//! `zcache expand` subcommand + +use std::path::PathBuf; + +use anyhow::Result; +use async_trait::async_trait; +use clap::Parser; +use util::message::ExpandDiskRequest; +use util::message::ExpandDiskResponse; +use util::message::TYPE_EXPAND_DISK; +use util::nice_p2size; +use util::writeln_stdout; + +use crate::remote_channel::RemoteChannel; +use crate::subcommand::ZcacheSubCommand; + +#[derive(Parser)] +#[clap(about = "Expand a disk in the ZettaCache.")] +pub struct Expand { + path: PathBuf, +} + +#[async_trait] +impl ZcacheSubCommand for Expand { + async fn invoke(&self) -> Result<()> { + let mut remote = RemoteChannel::new(true).await?; + + let request = ExpandDiskRequest { + path: self.path.clone(), + }; + + let nvlist = remote + .call(TYPE_EXPAND_DISK, Some(nvpair::to_nvlist(&request).unwrap())) + .await?; + let response: ExpandDiskResponse = nvpair::from_nvlist(&nvlist)?; + if response.additional_bytes > 0 { + writeln_stdout!( + "Disk {:?} expanded, new size {} (added {})", + self.path, + nice_p2size(response.new_size), + nice_p2size(response.additional_bytes) + ); + } else { + writeln_stdout!( + "Disk {:?} expansion not needed, size {}", + self.path, + nice_p2size(response.new_size) + ); + } + Ok(()) + } +} diff --git a/cmd/zfs_object_agent/zcache/src/main.rs b/cmd/zfs_object_agent/zcache/src/main.rs index b02935a3cdee..49e7079509be 100644 --- a/cmd/zfs_object_agent/zcache/src/main.rs +++ b/cmd/zfs_object_agent/zcache/src/main.rs @@ -6,6 +6,7 @@ #![deny(clippy::print_stderr)] mod add; +mod expand; mod hits; mod iostat; mod labelclear; @@ -59,6 +60,7 @@ enum Commands { List(list::List), Stats(stats::Stats), Add(add::Add), + Expand(expand::Expand), Sync(sync::Sync), Labelclear(labelclear::Labelclear), @@ -75,6 +77,7 @@ impl Commands { Commands::List(list) => list, Commands::Stats(stats) => stats, Commands::Add(add) => add, + Commands::Expand(expand) => expand, Commands::Sync(sync) => sync, Commands::Labelclear(labelclear) => labelclear, Commands::ClearHitData(clear_hit_data) => clear_hit_data, diff --git a/cmd/zfs_object_agent/zettacache/src/block_access.rs b/cmd/zfs_object_agent/zettacache/src/block_access.rs index 7662981fbc10..3777bd9f1f8d 100644 --- a/cmd/zfs_object_agent/zettacache/src/block_access.rs +++ b/cmd/zfs_object_agent/zettacache/src/block_access.rs @@ -9,6 +9,7 @@ use std::os::unix::prelude::OpenOptionsExt; use std::path::Path; use std::path::PathBuf; use std::sync::atomic::Ordering; +use std::sync::Mutex; use std::sync::RwLock; use std::thread::sleep; use std::time::Duration; @@ -35,6 +36,7 @@ use tokio::sync::oneshot; use util::from64::AsUsize; use util::iter_wrapping; use util::measure; +use util::message::ExpandDiskResponse; use util::serde::from_json_slice; use util::tunable; use util::with_alloctag; @@ -149,7 +151,7 @@ pub struct Disk { path: PathBuf, canonical_path: PathBuf, - size: u64, + size: Mutex, sector_size: usize, #[derivative(Debug = "ignore")] io_stats: &'static DiskIoStats, @@ -216,20 +218,7 @@ impl Disk { .with_context(|| format!("opening disk {path:?}"))?; // see comment in `struct Disk` let file = &*Box::leak(Box::new(file)); - let stat = nix::sys::stat::fstat(file.as_raw_fd())?; - trace!("stat: {:?}", stat); - let mode = SFlag::from_bits_truncate(stat.st_mode); - let sector_size; - let size; - if mode.contains(SFlag::S_IFBLK) { - size = blkgetsize64(file)?; - sector_size = blksszget(file)?; - } else if mode.contains(SFlag::S_IFREG) { - size = u64::try_from(stat.st_size)?; - sector_size = *MIN_SECTOR_SIZE; - } else { - panic!("{path:?}: invalid file type {mode:?}"); - } + let (sector_size, size) = disk_sizes(file)?; let short_name = path.file_name().unwrap().to_string_lossy().to_string(); let canonical_path = Path::new(path).canonicalize()?; @@ -258,7 +247,7 @@ impl Disk { file, path: path.to_owned(), canonical_path, - size, + size: Mutex::new(size), sector_size, io_stats, reader_tx, @@ -560,6 +549,26 @@ impl BlockAccess { Ok(id) } + // Returns the number of bytes added to the disk. + pub fn expand_disk(&self, disk: DiskId) -> Result { + let disks = self.disks.read().unwrap(); + let disk = &disks[disk.index()]; + let (_, new_size) = disk_sizes(disk.file)?; + let mut size = disk.size.lock().unwrap(); + let additional_bytes = new_size.checked_sub(*size).ok_or_else(|| { + anyhow!( + "{disk:?} {:?} ({:?}) size decreased from {size} to {new_size}", + disk.path, + disk.canonical_path, + ) + })?; + *size = new_size; + Ok(ExpandDiskResponse { + additional_bytes, + new_size, + }) + } + /// Note: In the future we'll support device removal in which case the /// DiskId's will probably not be sequential. By using this accessor we /// need not assume anything about the values inside the DiskId's. @@ -567,6 +576,19 @@ impl BlockAccess { (0..self.disks.read().unwrap().len()).map(DiskId::new) } + pub fn path_to_disk_id(&self, path: &Path) -> Result { + let canonical_path = path.canonicalize()?; + self.disks + .read() + .unwrap() + .iter() + .position(|disk| disk.canonical_path == canonical_path) + .map(DiskId::new) + .ok_or_else(|| { + anyhow!("disk {path:?} ({canonical_path:?}) is not part of the zettacache") + }) + } + // Gather a list of devices for zcache list_devices command. pub fn list_devices(&self) -> DeviceList { let devices = self @@ -576,14 +598,17 @@ impl BlockAccess { .iter() .map(|d| DeviceEntry { name: d.path.clone(), - size: d.size, + size: *d.size.lock().unwrap(), }) .collect(); DeviceList { devices } } pub fn disk_size(&self, disk: DiskId) -> u64 { - self.disks.read().unwrap()[disk.index()].size + *self.disks.read().unwrap()[disk.index()] + .size + .lock() + .unwrap() } pub fn disk_extent(&self, disk: DiskId) -> Extent { @@ -820,6 +845,20 @@ fn blksszget(file: &File) -> Result { Ok(ssz) } +/// get (sector_size, disk_size), both in bytes +fn disk_sizes(file: &File) -> Result<(usize, u64)> { + let stat = nix::sys::stat::fstat(file.as_raw_fd())?; + trace!("stat: {:?}", stat); + let mode = SFlag::from_bits_truncate(stat.st_mode); + if mode.contains(SFlag::S_IFBLK) { + Ok((blksszget(file)?, blkgetsize64(file)?)) + } else if mode.contains(SFlag::S_IFREG) { + Ok((*MIN_SECTOR_SIZE, u64::try_from(stat.st_size)?)) + } else { + panic!("{file:?}: invalid file type {mode:?}"); + } +} + /// use pread() to read into an aligned vector fn pread_aligned(file: &File, offset: i64, len: usize, alignment: usize) -> Result { let mut vec = with_alloctag("pread()", || AlignedVec::with_capacity(len, alignment)); diff --git a/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs b/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs index 246f6e790a37..0d7388c47eea 100644 --- a/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs +++ b/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use std::time::Duration; use std::time::Instant; +use anyhow::anyhow; use anyhow::Result; use arc_swap::ArcSwapAny; use arc_swap::ArcSwapOption; @@ -48,6 +49,7 @@ use tokio::time::sleep_until; use util::concurrent_batch::ConcurrentBatch; use util::lock_non_send; use util::measure; +use util::message::ExpandDiskResponse; use util::nice_p2size; use util::super_trace; use util::tunable; @@ -965,6 +967,13 @@ impl ZettaCache { } } + pub async fn expand_disk(&self, path: &Path) -> Result { + match &*self.inner.load() { + Some(inner) => inner.expand_disk(path).await, + None => Err(anyhow!("disk {path:?} is not part of the zettacache")), + } + } + pub async fn initiate_merge(&self) { if let Some(inner) = &*self.inner.load() { inner.initiate_merge().await; @@ -2065,6 +2074,13 @@ impl Inner { Ok(()) } + // Returns the amount of additional space, in bytes + async fn expand_disk(&self, path: &Path) -> Result { + let additional_bytes = self.locked.lock().await.expand_disk(path)?; + self.sync_checkpoint().await; + Ok(additional_bytes) + } + async fn initiate_merge(&self) { self.locked.lock().await.request_merge(); self.sync_checkpoint().await; @@ -2827,6 +2843,29 @@ impl Locked { Ok(()) } + // Returns the amount of additional space, in bytes. + fn expand_disk(&mut self, path: &Path) -> Result { + let disk = self.block_access.path_to_disk_id(path)?; + let response = self.block_access.expand_disk(disk)?; + if response.additional_bytes > 0 { + let phys = self.primary.disks.get_mut(&disk).unwrap(); + let expanded_capacity = Extent::new(disk, phys.size, response.additional_bytes); + info!("expanding existing disk {path:?}: {expanded_capacity:?}"); + self.slab_allocator.extend(expanded_capacity); + + // Update disk size in primary + phys.size = response.new_size; + + // The hit data isn't accurate across cache size changes, so clear + // it, which also updates the histogram parameters to reflect the + // new cache size. + self.clear_hit_data(); + } else { + info!("{disk:?} ({path:?}) has no expansion capacity"); + } + Ok(response) + } + fn request_merge(&mut self) { self.merge_requested = true; } diff --git a/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs b/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs index d2a52d270c6f..2d240c67599a 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs @@ -12,7 +12,7 @@ use std::time::Instant; use anyhow::anyhow; use anyhow::Context; use anyhow::Result; -use async_stream::stream; +use async_stream::try_stream; use async_trait::async_trait; use azure_core::HttpError; use azure_identity::token_credentials::ImdsManagedIdentityCredential; @@ -548,11 +548,11 @@ impl ObjectAccessTrait for BlobObjectAccess { start_after: Option, use_delimiter: bool, list_prefixes: bool, - ) -> Pin + Send + '_>> { + ) -> Pin> + Send + '_>> { let msg = format!("list {} (after {:?})", prefix, start_after); let list_prefix = prefix; - let stream_result = stream! { + Box::pin(try_stream! { let output = retry(&msg, None, || async { let container_client = self.get_container_client().await; let list_builder = match use_delimiter { @@ -575,7 +575,7 @@ impl ObjectAccessTrait for BlobObjectAccess { Ok(res) => Ok(res), } }) - .await.unwrap(); + .await?; // XXX The performance of this is likely to be quite bad. We need a better solution. DOSE-1215 let initial = start_after.unwrap_or("".to_string()); @@ -594,9 +594,7 @@ impl ObjectAccessTrait for BlobObjectAccess { } } } - }; - - Box::pin(stream_result) + }) } } diff --git a/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs b/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs index 44324b38c6f6..4ee7fa21b842 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs @@ -492,8 +492,18 @@ impl ObjectAccess { ) -> impl Stream + Send + '_ { self.as_trait() .list(prefix, start_after, use_delimiter, false) + .map(|result| result.unwrap()) } + pub fn try_list_objects( + &self, + prefix: String, + start_after: Option, + use_delimiter: bool, + ) -> impl Stream> + Send + '_ { + self.as_trait() + .list(prefix, start_after, use_delimiter, false) + } pub async fn collect_objects( &self, prefix: String, @@ -503,7 +513,9 @@ impl ObjectAccess { } pub fn list_prefixes(&self, prefix: String) -> impl Stream + '_ { - self.as_trait().list(prefix, None, true, true) + self.as_trait() + .list(prefix, None, true, true) + .map(|result| result.unwrap()) } pub fn collect_stats(&self) -> HashMap { @@ -567,7 +579,7 @@ pub trait ObjectAccessTrait: Send + Sync { start_after: Option, use_delimiter: bool, list_prefixes: bool, - ) -> Pin + Send + '_>>; + ) -> Pin> + Send + '_>>; async fn get_object( &self, diff --git a/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs b/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs index 5041ef168e20..2ce1c9491c0d 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs @@ -8,7 +8,7 @@ use std::time::Instant; use anyhow::anyhow; use anyhow::Context; use anyhow::Result; -use async_stream::stream; +use async_stream::try_stream; use async_trait::async_trait; use bytes::Bytes; use bytes::BytesMut; @@ -467,7 +467,7 @@ impl ObjectAccessTrait for S3ObjectAccess { start_after: Option, use_delimiter: bool, list_prefixes: bool, - ) -> Pin + Send>> { + ) -> Pin> + Send>> { let mut continuation_token = None; // XXX ObjectAccess should really be refcounted (behind Arc) let client = self.client.clone(); @@ -476,7 +476,7 @@ impl ObjectAccessTrait for S3ObjectAccess { true => Some("/".to_string()), false => None, }; - Box::pin(stream! { + Box::pin(try_stream! { loop { let output = retry( &format!("list {} (after {:?})", prefix, start_after), @@ -495,8 +495,7 @@ impl ObjectAccessTrait for S3ObjectAccess { Ok(client.list_objects_v2(req).await?) }, ) - .await - .unwrap(); + .await?; if list_prefixes { if let Some(prefixes) = output.common_prefixes { diff --git a/cmd/zfs_object_agent/zettaobject/src/root_connection.rs b/cmd/zfs_object_agent/zettaobject/src/root_connection.rs index f2237d3ac6b1..c5e6fec358b2 100644 --- a/cmd/zfs_object_agent/zettaobject/src/root_connection.rs +++ b/cmd/zfs_object_agent/zettaobject/src/root_connection.rs @@ -121,6 +121,7 @@ impl RootConnectionState { ); server.register_handler(TYPE_CLEAR_HIT_DATA, Box::new(Self::clear_hit_data)); server.register_handler(TYPE_ADD_DISK, Box::new(Self::add_disk)); + server.register_handler(TYPE_EXPAND_DISK, Box::new(Self::expand_disk)); server.register_handler(TYPE_SYNC_CHECKPOINT, Box::new(Self::sync_checkpoint)); server.register_handler(TYPE_INITIATE_MERGE, Box::new(Self::initiate_merge)); server.register_struct_handler(MessageType::ReadBlock, Box::new(Self::read_block)); @@ -607,6 +608,20 @@ impl RootConnectionState { })) } + fn expand_disk(&mut self, nvl: NvList) -> HandlerReturn { + let cache = self.cache.clone(); + Ok(Box::pin(async move { + let request: ExpandDiskRequest = nvpair::from_nvlist(&nvl)?; + debug!("got {:?}", request); + + let result = cache + .expand_disk(&request.path) + .await + .map_err(FailureMessage::new); + return_result(TYPE_EXPAND_DISK, (), result, true) + })) + } + fn sync_checkpoint(&mut self, nvl: NvList) -> HandlerReturn { let cache = self.cache.clone(); Ok(Box::pin(async move { diff --git a/cmd/zfs_object_agent/zettaobject/src/test_connectivity.rs b/cmd/zfs_object_agent/zettaobject/src/test_connectivity.rs index 49675193ba8d..d7e27adc119e 100644 --- a/cmd/zfs_object_agent/zettaobject/src/test_connectivity.rs +++ b/cmd/zfs_object_agent/zettaobject/src/test_connectivity.rs @@ -1,5 +1,6 @@ use std::time::Duration; +use futures::TryStreamExt; use rand::Rng; use serde::Deserialize; use util::writeln_stderr; @@ -20,15 +21,12 @@ struct Error { message: String, } -// Test by writing and deleting an object. -async fn do_test_connectivity(object_access: &ObjectAccess) -> Result<(), String> { - let num: u64 = rand::thread_rng().gen(); - let file = format!("test/test_connectivity_{}", num); - let content = "test connectivity to S3".as_bytes().to_vec(); +async fn create_object_test(object_access: &ObjectAccess, key: String) -> Result<(), String> { + let content = "test connectivity to object storage".as_bytes().to_vec(); match object_access .put_object_timed( - file.clone(), + key.clone(), content.into(), ObjectAccessOpType::MetadataPut, Some(Duration::from_secs(30)), @@ -50,7 +48,7 @@ async fn do_test_connectivity(object_access: &ObjectAccess) -> Result<(), String if let Some(index) = body.find("(&body[index..]) { return Err(format!( - "Connectivity test failed: {}: {}", + "unable to create: {}, {}", error.code, error.message )); } @@ -58,21 +56,62 @@ async fn do_test_connectivity(object_access: &ObjectAccess) -> Result<(), String // If the error string can not be deserialized as xml, return the entirety of // the error back. - Err(format!("Connectivity test failed: {}", body)) + Err(format!("unable to create: {}", body)) } - Err(OAError::TimeoutError(_)) => { - Err("Connectivity test failed with a timeout.".to_string()) + Err(OAError::TimeoutError(_)) => Err("connection timed out.".to_string()), + Err(OAError::RequestError(RequestError::Credentials(err))) => { + Err(format!("credentials error: {}", err)) } - Err(OAError::RequestError(RequestError::Credentials(err))) => Err(format!( - "Connectivity test failed due to a credentials error: {}", - err - )), - Err(err) => Err(format!("Connectivity test failed: {}", err)), - Ok(_) => { - object_access.delete_object(file).await; - Ok(()) + Err(err) => Err(format!("{}", err)), + Ok(_) => Ok(()), + } +} + +// Test by writing and deleting an object. +async fn do_test_connectivity(object_access: &ObjectAccess) -> Result<(), String> { + let num: u64 = rand::thread_rng().gen(); + let prefix = String::from("test/"); + let file = format!("{}test_connectivity_{}", prefix, num); + + create_object_test(object_access, file.clone()).await?; + + if let Err(e) = object_access + .get_object(file.clone(), ObjectAccessOpType::MetadataGet) + .await + { + return Err(format!("unable to find object: {e}")); + } + + let objects = object_access + .try_list_objects(prefix, None, false) + .try_collect::>() + .await; + + let objects = match objects { + Ok(objects) => objects, + Err(e) => { + let content = e.to_string(); + if let Ok(error) = serde_xml_rs::from_str::(&content) { + return Err(format!("unable to list objects: {}", error.message)); + } else { + return Err(format!("unable to list objects: {}", content)); + } } + }; + + if objects != vec![file.clone()] { + return Err(format!( + "object listing mismatch, expected {}, got {:?}", + file, objects + )); + } + + object_access.delete_object(file.clone()).await; + if object_access.object_exists(file.clone()).await { + return Err("unable to delete objects".to_string()); } + + Ok(()) } pub async fn test_connectivity(protocol: ObjectAccessProtocol, bucket: String) { @@ -86,7 +125,7 @@ pub async fn test_connectivity(protocol: ObjectAccessProtocol, bucket: String) { std::process::exit(match do_test_connectivity(&object_access).await { Err(err) => { - writeln_stderr!("{}", err); + writeln_stderr!("Connectivity test failed: {}", err); 1 } Ok(_) => {