Skip to content

Commit

Permalink
*: remove unnecessary async blocks to save memory
Browse files Browse the repository at this point in the history
This commit favors FutureExt::map over async blocks to mitigate
the issue of async block doubled memory usage. Through the sysbench
oltp_read_only test, it was observed that this adjustment resulted
in approximately 26% reduction in memory usage.

See: rust-lang/rust#59087

Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Feb 20, 2024
1 parent bca97c2 commit 9b9b2da
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 23 deletions.
17 changes: 11 additions & 6 deletions components/tikv_util/src/yatp_pool/future_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::{

use fail::fail_point;
use futures::channel::oneshot::{self, Canceled};
use futures_util::future::FutureExt;
use prometheus::{IntCounter, IntGauge};
use tracker::TrackedFuture;
use yatp::{queue::Extras, task::future};
Expand Down Expand Up @@ -216,11 +217,13 @@ impl PoolInner {

metrics_running_task_count.inc();

let f = async move {
let _ = future.await;
// NB: Prefer FutureExt::map to async block, because an async block
// doubles memory usage.
// See https://github.com/rust-lang/rust/issues/59087
let f = future.map(move |_| {
metrics_handled_task_count.inc();
metrics_running_task_count.dec();
};
});

if let Some(extras) = extras {
self.pool.spawn(future::TaskCell::new(f, extras));
Expand All @@ -246,12 +249,14 @@ impl PoolInner {

let (tx, rx) = oneshot::channel();
metrics_running_task_count.inc();
self.pool.spawn(async move {
let res = future.await;
// NB: Prefer FutureExt::map to async block, because an async block
// doubles memory usage.
// See https://github.com/rust-lang/rust/issues/59087
self.pool.spawn(future.map(move |res| {
metrics_handled_task_count.inc();
metrics_running_task_count.dec();
let _ = tx.send(res);
});
}));
Ok(rx)
}
}
Expand Down
20 changes: 10 additions & 10 deletions src/read_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use std::{
};

use file_system::{set_io_type, IoType};
use futures::{channel::oneshot, future::TryFutureExt};
use futures::{
channel::oneshot,
future::{FutureExt, TryFutureExt},
};
use kvproto::{errorpb, kvrpcpb::CommandPri};
use online_config::{ConfigChange, ConfigManager, ConfigValue, Result as CfgResult};
use prometheus::{core::Metric, Histogram, IntCounter, IntGauge};
Expand Down Expand Up @@ -172,10 +175,9 @@ impl ReadPoolHandle {
TaskCell::new(
TrackedFuture::new(with_resource_limiter(
ControlledFuture::new(
async move {
f.await;
f.map(move |_| {
running_tasks.dec();
},
}),
resource_ctl.clone(),
group_name,
),
Expand All @@ -185,10 +187,9 @@ impl ReadPoolHandle {
)
} else {
TaskCell::new(
TrackedFuture::new(async move {
f.await;
TrackedFuture::new(f.map(move |_| {
running_tasks.dec();
}),
})),
extras,
)
};
Expand All @@ -212,10 +213,9 @@ impl ReadPoolHandle {
{
let (tx, rx) = oneshot::channel::<T>();
let res = self.spawn(
async move {
let res = f.await;
f.map(move |res| {
let _ = tx.send(res);
},
}),
priority,
task_id,
metadata,
Expand Down
9 changes: 2 additions & 7 deletions src/storage/txn/sched_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{

use collections::HashMap;
use file_system::{set_io_type, IoType};
use futures::future::FutureExt;
use kvproto::{kvrpcpb::CommandPri, pdpb::QueryKind};
use pd_client::{Feature, FeatureGate};
use prometheus::local::*;
Expand Down Expand Up @@ -130,13 +131,7 @@ impl PriorityQueue {
extras.set_metadata(metadata.to_vec());
self.worker_pool.spawn_with_extras(
with_resource_limiter(
ControlledFuture::new(
async move {
f.await;
},
self.resource_ctl.clone(),
group_name,
),
ControlledFuture::new(f, self.resource_ctl.clone(), group_name),
resource_limiter,
),
extras,
Expand Down

0 comments on commit 9b9b2da

Please sign in to comment.