Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I wonder if there is any atomicity violation in poll_ready #2429

Closed
BurtonQin opened this issue Apr 22, 2020 · 6 comments
Closed

I wonder if there is any atomicity violation in poll_ready #2429

BurtonQin opened this issue Apr 22, 2020 · 6 comments
Labels
A-tokio Area: The main tokio crate C-question User questions that are neither feature requests nor bug reports M-io Module: tokio/io T-docs Topic: documentation

Comments

@BurtonQin
Copy link
Contributor

Version

0.2.18

Platform

64-bit Windows 10

Description

By reading the source code, I wonder if there is any atomicity violation in macro poll_ready in tokio/src/io/poll_evented.rs

let mut cached = $me.inner.$cache.load(Relaxed);
let mask = $mask | platform::hup() | platform::error();
// See if the current readiness matches any bits.
let mut ret = mio::Ready::from_usize(cached) & $mask;
if ret.is_empty() {
// Readiness does not match, consume the registration's readiness
// stream. This happens in a loop to ensure that the stream gets
// drained.
loop {
let ready = match $poll? {
Poll::Ready(v) => v,
Poll::Pending => return Poll::Pending,
};
cached |= ready.as_usize();
// Update the cache store
$me.inner.$cache.store(cached, Relaxed);
ret |= ready & mask;
if !ret.is_empty() {
return Poll::Ready(Ok(ret));
}
}
} else {
// Check what's new with the registration stream. This will not
// request to be notified
if let Some(ready) = $me.inner.registration.$take()? {
cached |= ready.as_usize();
$me.inner.$cache.store(cached, Relaxed);
}
Poll::Ready(Ok(mio::Ready::from_usize(cached)))
}

The above logic can be simplified to

1. let mut cached = self.cache.load();
2. ret = F1(cached);
3. Check if ret.is_empty();
4. if not: cached = F2(cached); self.cache.store(cached);

If two threads T1&T2 are executing simultaneously:
T1: 1, 2, 3 (if ret.is_empty() is false)
T2: 1, 2, 3, 4 (update self.cache, maybe now ret.is_empty() is true)
T1: 4 (Still consider ret.is_empty() is false and update self.cache accordingly)

The problem is load() and store() are seperated and
the execution of store() is dependent on load().
Therefore, if load() and store() are interleaved by another store(), a possible atomicity violation happens.

I wonder if this is an issue. If not, how does the code logic avoid it? Thank you.

@Darksonn Darksonn added A-tokio Area: The main tokio crate C-question User questions that are neither feature requests nor bug reports M-io Module: tokio/io labels Apr 22, 2020
@carllerche
Copy link
Member

Probably.... short answer is: don't do it 😄

There are bigger issues w/ concurrently calling poll_read_ready and poll_write_ready... they can only remember one waker.

I looked at the fns and was surprised there was no documentation calling this out. Would you mind submitting a PR adding some documentation stating that the caller must ensure that the functions are not being concurrently called?

@carllerche carllerche added the T-docs Topic: documentation label Apr 23, 2020
@Darksonn
Copy link
Contributor

Note that for types such as TcpStream, this is enforced by the type system by making it impossible to call the functions concurrently due to functions taking &mut self. This is why splitting works the way it does.

// == Poll IO functions that takes `&self` ==
//
// They are not public because (taken from the doc of `PollEvented`):
//
// While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the
// caller must ensure that there are at most two tasks that use a
// `PollEvented` instance concurrently. One for reading and one for writing.
// While violating this requirement is "safe" from a Rust memory model point
// of view, it will result in unexpected behavior in the form of lost
// notifications and tasks hanging.
pub(crate) fn poll_read_priv(
&self,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
match self.io.get_ref().read(buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_read_ready(cx, mio::Ready::readable())?;
Poll::Pending
}
x => Poll::Ready(x),
}
}
pub(super) fn poll_write_priv(
&self,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
ready!(self.io.poll_write_ready(cx))?;
match self.io.get_ref().write(buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_write_ready(cx)?;
Poll::Pending
}
x => Poll::Ready(x),
}
}

@BurtonQin
Copy link
Contributor Author

BurtonQin commented Apr 24, 2020

The following safe code calls poll_read_ready() in two threads and it built successfully.

//main.rs
#![feature(async_closure)]
use tokio::io::PollEvented;

use futures::ready;
use mio::Ready;
use mio::net::{TcpStream, TcpListener};
use std::io;
use std::task::{Context, Poll};
use std::net::SocketAddr;
use futures::future::poll_fn;
use std::sync::Arc;
use std::thread;

struct MyListener {
    poll_evented: PollEvented<TcpListener>,
}

impl MyListener {
    pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<Result<TcpStream, io::Error>> {
        
        let ready = Ready::readable();

        ready!(self.poll_evented.poll_read_ready(cx, ready))?;
        // poll_read_ready() shall not be concurrently called
        match self.poll_evented.get_ref().accept() {
            Ok((socket, _)) => Poll::Ready(Ok(socket)),
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                self.poll_evented.clear_read_ready(cx, ready)?;
                Poll::Pending
            }
            Err(e) => Poll::Ready(Err(e)),
        }
    }
}

async fn foo() {
    let addr = "127.0.0.1:8000".parse::<SocketAddr>().unwrap();
    let listener1= Arc::new(MyListener {
        poll_evented: PollEvented::new(TcpListener::bind(&addr).unwrap()).unwrap(),
    });

    let listener2 = Arc::clone(&listener1);

    poll_fn(|cx| listener1.poll_accept(cx)).await.unwrap();  // poll_read_ready() called in thread 1.

    let th = thread::spawn(async move ||
        poll_fn(move |cx| listener2.poll_accept(cx)).await  // poll_read_ready() called in thread 2.
    );
    th.join().unwrap().await.unwrap();
}

#[tokio::main]
async fn main() {
    foo().await
}
//Cargo.toml
[dependencies]
tokio = { version = "0.2.18", features = ["io-driver"] }
mio = "0.6.10"
futures = "0.3.4"

@Darksonn
Copy link
Contributor

Darksonn commented Apr 24, 2020

Calling it concurrently will mess up mio's internal state, but it shouldn't cause UB. My previous comment on TcpStream was regarding the tokio::net::TcpStream type, not the PollEvented<mio::net::TcpStream> type, which doesn't have these safeguards.

@BurtonQin
Copy link
Contributor Author

When the safety of a function depends on its caller, it does not seem that safe.
I think a document is good for now.
In the future, perhaps we should try to change the function interfaces (to &mut self?) so that the compiler can prevent the developers from concurrently calling them.

@carllerche
Copy link
Member

Safety means something very specific in Rust.

That said, I think the fact that the concern leaked out is not ideal. Fixing it requires a breaking change though, so until 0.3, docs is the best we can do.

jensim pushed a commit to jensim/tokio that referenced this issue Jun 7, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-question User questions that are neither feature requests nor bug reports M-io Module: tokio/io T-docs Topic: documentation
Projects
None yet
Development

No branches or pull requests

3 participants