Skip to content

Commit

Permalink
Document more of the unzip implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
cuviper committed May 1, 2017
1 parent fa9e225 commit 4581a9a
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 1 deletion.
4 changes: 4 additions & 0 deletions src/iter/collect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use super::unzip::unzip_indexed;
mod test;

/// Collects the results of the exact iterator into the specified vector.
///
/// This is not directly public, but called by `IndexedParallelIterator::collect_into`.
pub fn collect_into<I, T>(mut pi: I, v: &mut Vec<T>)
where I: IndexedParallelIterator<Item = T>,
T: Send
Expand Down Expand Up @@ -41,6 +43,8 @@ fn special_extend<I, T>(pi: I, len: usize, v: &mut Vec<T>)
}

/// Unzips the results of the exact iterator into the specified vectors.
///
/// This is not directly public, but called by `IndexedParallelIterator::unzip_into`.
pub fn unzip_into<I, A, B>(mut pi: I, left: &mut Vec<A>, right: &mut Vec<B>)
where I: IndexedParallelIterator<Item = (A, B)>,
A: Send,
Expand Down
27 changes: 26 additions & 1 deletion src/iter/unzip.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
use super::internal::*;
use super::*;

/// This trait abstracts the different ways we can "unzip" one parallel
/// iterator into two distinct consumers, which we can handle almost
/// identically apart from how to process the individual items.
trait UnzipOp<T>: Sync {
/// The type of item expected by the left consumer.
type Left: Send;

/// The type of item expected by the right consumer.
type Right: Send;

/// Consume one item and feed it to one or both of the underlying folders.
fn consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB)
where FA: Folder<Self::Left>,
FB: Folder<Self::Right>;

/// Reports whether this op may support indexed consumers.
/// - e.g. true for `unzip` where the item count passed through directly.
/// - e.g. false for `partition` where the sorting is not yet known.
fn indexable() -> bool {
false
}
}

/// Run an unzip-like operation into `ParallelExtend` collections.
fn execute<I, OP, FromA, FromB>(pi: I, op: OP) -> (FromA, FromB)
where I: ParallelIterator,
OP: UnzipOp<I::Item>,
Expand All @@ -39,6 +50,8 @@ fn execute<I, OP, FromA, FromB>(pi: I, op: OP) -> (FromA, FromB)

/// Unzips the items of a parallel iterator into a pair of arbitrary
/// `ParallelExtend` containers.
///
/// This is not directly public, but called by `ParallelIterator::unzip`.
pub fn unzip<I, A, B, FromA, FromB>(pi: I) -> (FromA, FromB)
where I: ParallelIterator<Item = (A, B)>,
FromA: Default + ParallelExtend<A>,
Expand All @@ -50,6 +63,8 @@ pub fn unzip<I, A, B, FromA, FromB>(pi: I) -> (FromA, FromB)
}

/// Unzip an `IndexedParallelIterator` into two arbitrary `Consumer`s.
///
/// This is not directly public, but called by `super::collect::unzip_into`.
pub fn unzip_indexed<I, A, B, CA, CB>(pi: I, left: CA, right: CB) -> (CA::Result, CB::Result)
where I: IndexedParallelIterator<Item = (A, B)>,
CA: Consumer<A>,
Expand All @@ -65,6 +80,7 @@ pub fn unzip_indexed<I, A, B, CA, CB>(pi: I, left: CA, right: CB) -> (CA::Result
pi.drive(consumer)
}

/// An `UnzipOp` that splits a tuple directly into the two consumers.
struct Unzip;

impl<A: Send, B: Send> UnzipOp<(A, B)> for Unzip {
Expand All @@ -86,6 +102,8 @@ impl<A: Send, B: Send> UnzipOp<(A, B)> for Unzip {

/// Partitions the items of a parallel iterator into a pair of arbitrary
/// `ParallelExtend` containers.
///
/// This is not directly public, but called by `ParallelIterator::partition`.
pub fn partition<I, A, B, P>(pi: I, predicate: P) -> (A, B)
where I: ParallelIterator,
A: Default + ParallelExtend<I::Item>,
Expand All @@ -95,6 +113,7 @@ pub fn partition<I, A, B, P>(pi: I, predicate: P) -> (A, B)
execute(pi, Partition { predicate: predicate })
}

/// An `UnzipOp` that routes items depending on a predicate function.
struct Partition<P> {
predicate: P,
}
Expand All @@ -121,6 +140,8 @@ impl<P, T> UnzipOp<T> for Partition<P>

/// Partitions and maps the items of a parallel iterator into a pair of
/// arbitrary `ParallelExtend` containers.
///
/// This is not directly public, but called by `ParallelIterator::partition_map`.
pub fn partition_map<I, A, B, P, L, R>(pi: I, predicate: P) -> (A, B)
where I: ParallelIterator,
A: Default + ParallelExtend<L>,
Expand All @@ -132,6 +153,7 @@ pub fn partition_map<I, A, B, P, L, R>(pi: I, predicate: P) -> (A, B)
execute(pi, PartitionMap { predicate: predicate })
}

/// An `UnzipOp` that routes items depending on how they are mapped `Either`.
struct PartitionMap<P> {
predicate: P,
}
Expand Down Expand Up @@ -184,7 +206,10 @@ impl<'b, I, OP, FromB> ParallelIterator for UnzipA<'b, I, OP, FromB>
};
self.b.par_extend(iter);
}
result.unwrap()
// NB: If for some reason `b.par_extend` doesn't actually drive the
// iterator, then we won't have a result for the left side to return
// at all. We can't fake an arbitrary consumer's result, so panic.
result.expect("unzip consumers didn't execute!")
}

fn opt_len(&mut self) -> Option<usize> {
Expand Down

0 comments on commit 4581a9a

Please sign in to comment.