Skip to content

Commit

Permalink
io: add mio::Ready argument to PollEvented (tokio-rs#2419)
Browse files Browse the repository at this point in the history
Add additional methods to allow PollEvented to be created with an appropriate
mio::Ready state, so that it can be properly registered with the reactor.

Fixes tokio-rs#2413
  • Loading branch information
dbcfd authored and jensim committed Jun 7, 2020
1 parent bae8754 commit 3c7187b
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 12 deletions.
24 changes: 14 additions & 10 deletions tokio/src/io/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,14 @@ impl fmt::Debug for Handle {
// ===== impl Inner =====

impl Inner {
/// Registers an I/O resource with the reactor.
/// Registers an I/O resource with the reactor for a given `mio::Ready` state.
///
/// The registration token is returned.
pub(super) fn add_source(&self, source: &dyn Evented) -> io::Result<Address> {
pub(super) fn add_source(
&self,
source: &dyn Evented,
ready: mio::Ready,
) -> io::Result<Address> {
let address = self.io_dispatch.alloc().ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
Expand All @@ -253,7 +257,7 @@ impl Inner {
self.io.register(
source,
mio::Token(address.to_usize()),
mio::Ready::all(),
ready,
mio::PollOpt::edge(),
)?;

Expand Down Expand Up @@ -339,12 +343,12 @@ mod tests {
let inner = reactor.inner;
let inner2 = inner.clone();

let token_1 = inner.add_source(&NotEvented).unwrap();
let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
let thread = thread::spawn(move || {
inner2.drop_source(token_1);
});

let token_2 = inner.add_source(&NotEvented).unwrap();
let token_2 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
thread.join().unwrap();

assert!(token_1 != token_2);
Expand All @@ -360,15 +364,15 @@ mod tests {
// add sources to fill up the first page so that the dropped index
// may be reused.
for _ in 0..31 {
inner.add_source(&NotEvented).unwrap();
inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
}

let token_1 = inner.add_source(&NotEvented).unwrap();
let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
let thread = thread::spawn(move || {
inner2.drop_source(token_1);
});

let token_2 = inner.add_source(&NotEvented).unwrap();
let token_2 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
thread.join().unwrap();

assert!(token_1 != token_2);
Expand All @@ -383,11 +387,11 @@ mod tests {
let inner2 = inner.clone();

let thread = thread::spawn(move || {
let token_2 = inner2.add_source(&NotEvented).unwrap();
let token_2 = inner2.add_source(&NotEvented, mio::Ready::all()).unwrap();
token_2
});

let token_1 = inner.add_source(&NotEvented).unwrap();
let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
let token_2 = thread.join().unwrap();

assert!(token_1 != token_2);
Expand Down
30 changes: 29 additions & 1 deletion tokio/src/io/poll_evented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,35 @@ where
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
pub fn new(io: E) -> io::Result<Self> {
let registration = Registration::new(&io)?;
PollEvented::new_with_ready(io, mio::Ready::all())
}

/// Creates a new `PollEvented` associated with the default reactor, for specific `mio::Ready`
/// state. `new_with_ready` should be used over `new` when you need control over the readiness
/// state, such as when a file descriptor only allows reads. This does not add `hup` or `error`
/// so if you are interested in those states, you will need to add them to the readiness state
/// passed to this function.
///
/// An example to listen to read only
///
/// ```rust
/// ##[cfg(unix)]
/// mio::Ready::from_usize(
/// mio::Ready::readable().as_usize()
/// | mio::unix::UnixReady::error().as_usize()
/// | mio::unix::UnixReady::hup().as_usize()
/// );
/// ```
///
/// # Panics
///
/// This function panics if thread-local runtime is not set.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
pub fn new_with_ready(io: E, ready: mio::Ready) -> io::Result<Self> {
let registration = Registration::new_with_ready(&io, ready)?;
Ok(Self {
io: Some(io),
inner: Inner {
Expand Down
39 changes: 38 additions & 1 deletion tokio/src/io/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,49 @@ impl Registration {
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
pub fn new<T>(io: &T) -> io::Result<Registration>
where
T: Evented,
{
Registration::new_with_ready(io, mio::Ready::all())
}

/// Registers the I/O resource with the default reactor, for a specific `mio::Ready` state.
/// `new_with_ready` should be used over `new` when you need control over the readiness state,
/// such as when a file descriptor only allows reads. This does not add `hup` or `error` so if
/// you are interested in those states, you will need to add them to the readiness state passed
/// to this function.
///
/// An example to listen to read only
///
/// ```rust
/// ##[cfg(unix)]
/// mio::Ready::from_usize(
/// mio::Ready::readable().as_usize()
/// | mio::unix::UnixReady::error().as_usize()
/// | mio::unix::UnixReady::hup().as_usize()
/// );
/// ```
///
/// # Return
///
/// - `Ok` if the registration happened successfully
/// - `Err` if an error was encountered during registration
///
///
/// # Panics
///
/// This function panics if thread-local runtime is not set.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
pub fn new_with_ready<T>(io: &T, ready: mio::Ready) -> io::Result<Registration>
where
T: Evented,
{
let handle = Handle::current();
let address = if let Some(inner) = handle.inner() {
inner.add_source(io)?
inner.add_source(io, ready)?
} else {
return Err(io::Error::new(
io::ErrorKind::Other,
Expand Down

0 comments on commit 3c7187b

Please sign in to comment.