Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Condition and boundedForEach #249

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

jonasfj
Copy link
Member

@jonasfj jonasfj commented Aug 16, 2023

Adding:

  • Notifier with wait that'll be resolved when notify() is called. This is a primitive for signaling.
  • Stream.parallelForEach(maxParallel, eachFn, onError) for calling eachFn in parallel for each item in the stream.

It's possible that onError really should be called ignoreError and only take Object e or even Exception e. Since:

  • You shouldn't use the stack trace to decide if you are ignoring an error.
  • You should never ignore an error, only exceptions.

Or maybe we should call it continueIf, because it really makes sense to write:
continueIf: (Exception e) => e is TimeoutException and things like that.

Having an onError method where you can choose to rethrow or not is really bad. Because Error.throwWithStackTrace is annoying to call, so people are likely to loose their stack traces.

@jonasfj jonasfj requested a review from lrhn August 16, 2023 10:01
lib/src/condition_variable.dart Outdated Show resolved Hide resolved
}
return _completer.future;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is equivalent to a broadcast Stream<void> where you can only wait for the next event. An implementation could be:

class Notifier {
  final _control = StreamController<void>.broadcast();
  void notify() { _control.add(null); }
  Future<void> get wait => _control.stream.first;
}

(Probably more expensive, though.)

lib/src/stream_extensions.dart Outdated Show resolved Hide resolved
lib/src/stream_extensions.dart Outdated Show resolved Hide resolved
lib/src/stream_extensions.dart Outdated Show resolved Hide resolved
lib/src/stream_extensions.dart Outdated Show resolved Hide resolved
lib/src/stream_extensions.dart Outdated Show resolved Hide resolved
lib/src/stream_extensions.dart Outdated Show resolved Hide resolved
lib/src/stream_extensions.dart Outdated Show resolved Hide resolved
lib/src/stream_extensions.dart Show resolved Hide resolved
@jonasfj jonasfj marked this pull request as ready for review August 22, 2023 12:59
@jonasfj jonasfj requested a review from lrhn August 22, 2023 12:59
Future<void> parallelForEach(
int maxParallel,
FutureOr<void> Function(T item) each, {
FutureOr<void> Function(Object e, StackTrace? st) onError = Future.error,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to:

FutureOr<bool> Function(Exception e) continueIf

On the assumption that:

  • You should never ignore an Error (I could also be convince to be less opinionated).
  • You should never consider the stack trace when deciding to continue.

But I could use help choosing a better name? breakIf or breakOnError or ignoreErrorIf or filterError or errorIf?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not open to special-casing Exception. Especially not the raw Exception type.

If the user knows that each can throw a specific subtype of Exception, then each should be made to handle that, rather than failing and having a second function handle it (but not break iteration?).

I'd just not have this parameter at all.
And I'd make the first each to throw cancel the entire operation (probably still need to wait for the rest of the pending futures to complete, but I'd cancel the stream subscription.

Copy link
Member

@lrhn lrhn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Notifier is neat. A simple primitive that can occasionally be useful.

The parallelForEach feels more specialized, and the error handling is not consistent with other similar APIs.
The arguments look like they apply to stream events, but the onError only applies to the result of each, which shouldn't be necessary (if the user wants to catch errors from each, they just wrap the function with error handling. They know better what to handle and do.)

import 'package:meta/meta.dart';

/// A [Notifier] allows micro-tasks to [wait] for other micro-tasks to
/// [notify].
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't mention the microtask!

(Also, makes no sense. If a microtask is waiting for something, it's no longer the same microtask.)

So:

/// An asynchronous synchonization primitive.
///
/// Any number of asynchronous computations can
/// wait on the [wait] future, which will be
/// completed when someone calls [notify()].
/// After that, further reads of [wait] will provide
/// a new future which is then completed by the next
/// following call to [notify()].

// TODO: Apply `final` when language version for this library is bumped to 3.0
@sealed
class Notifier {
var _completer = Completer<void>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd make this nullable, setting it to non-null on the first wait, and setting it back to null on notify.

That way, if the first call is notify, you're not wasting a completer.

}
}

/// Wait for notification.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noun phrase for getters.

Maybe call it notification, since wait is also a verb.
But long, wait is short.
Maybe next

var tick = Notifier();
....
   tick.notify();
   ...
do {
  await tick.next;
} while (result != "success");
``  


/// Wait for notification.
///
/// Returns a [Future] that will complete the next time [notify] is called.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use "Returns" about properties.

/// The `wait` [Future] is completed the next time [notify] is called.

}) async {
// Track the first error, so we rethrow when we're done.
Object? firstError;
StackTrace? firstStackTrace;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd store this as (Object, StackTrace)? now, so we won't need to do if (firstError != null) ... (firstError, firstStackTrace!).

Future<void> parallelForEach(
int maxParallel,
FutureOr<void> Function(T item) each, {
FutureOr<void> Function(Object e, StackTrace? st) onError = Future.error,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not open to special-casing Exception. Especially not the raw Exception type.

If the user knows that each can throw a specific subtype of Exception, then each should be made to handle that, rather than failing and having a second function handle it (but not break iteration?).

I'd just not have this parameter at all.
And I'd make the first each to throw cancel the entire operation (probably still need to wait for the rest of the pending futures to complete, but I'd cancel the stream subscription.

/// });
/// print('Folder size: $folderSize');
/// ```
Future<void> parallelForEach(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could return a Stream<void>, if you want to know how many operations have completed, and see all their errors.

Or make it:

Stream<R> parallelMap<R>(int maxParallel, FutureOr<R> mapData(T), {FutureOr<R> Function(Object error, StackTrace stack)? mapError});

which just calls mapData on each element and mapError on each error (or forwards it unchanged if omitted), and does not guarantee preserving order.


try {
var doBreak = false;
await for (final item in this) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This throws if the stream contains an error. User might expect that their onError would handle it.
(If anything, I'd expect the onError argument to apply to these errors, not errors created by each. The callbacks are provide at the same level, so one of them should not apply to the result of the other.)

} finally {
// When [each] is done, we decrement [running] and notify
running -= 1;
itemDone.notify();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't really need notify for this.

Future<void> parallelForEach(
    int maxParallel,
    FutureOr<void> Function(T item) each) {
  var pending = 0;
  var done = false;
  var result = Completer<void>.sync();
  (Object, StackTrace)? firstError;
  void complete() {
    assert(pending == 0);
    assert(done);
    if (firstError case (var error, var stack)) {
      result.completeError(error, stack);
    } else {
      result.complete(null);
    }
 }
  var subscription = stream.listen(null, onDone: () {
    done = true;
    if (pending == 0) {
      complete();
    }
  });
  subscription
    ..onError((Object error, StackTrace stack) {
      subscription.cancel().ignore();
      done = true;
      firstError = (error, stack); // Takes precedence over user errors.
      if (pending == 0) complete();
    })
    ..onData((T value) {
        try {
          var computation = each(value);
          if (computation is Future) {
            if (++pending >= maxParallel) subscription.pause();
            computation.then((_) {
              if (--pending == 0 && done) complete();
              subscription.resume();
            }, onError: (Object error, StackTrace stack) {
              subscription.cancel().ignore();
              done = true;
              firstError ??= (error, stack);
              if (--pending == 0) complete();
           });
        } catch (error, stack) {
          subscription.cancel().ignore();
          done = true;
          firstError ??= (error, stack);
          if (pending == 0) complete();
        }
    });
  return result.future;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants