Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pool-bound forEach for Stream and Iterable #6765

Closed
wants to merge 5 commits into from

Conversation

isoos
Copy link
Collaborator

@isoos isoos commented Jun 15, 2023

  • replaced most Pool use in app/
  • also limiting the number of pending operations in the pool

@isoos isoos requested a review from jonasfj June 15, 2023 15:14
app/lib/shared/utils.dart Outdated Show resolved Hide resolved
app/lib/shared/utils.dart Outdated Show resolved Hide resolved
app/lib/shared/utils.dart Outdated Show resolved Hide resolved
app/lib/shared/utils.dart Outdated Show resolved Hide resolved
app/lib/shared/utils.dart Outdated Show resolved Hide resolved
app/lib/shared/utils.dart Outdated Show resolved Hide resolved
await Future.wait(futures);
futures.clear();
await recentlyUpdated.entries.boundedForEach(concurrency, (e) async {
await updatePackage(e.key, e.value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
await updatePackage(e.key, e.value);
if (!claim.valid) {
return;
}
await updatePackage(e.key, e.value);

(package) async {
if (package.isNotVisible) {
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
}
if (!claim.valid) {
return;
}

Comment on lines +377 to +380
// early exit on the first error
if (firstError != null) {
break;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure we want to do this?

All of the code you're replacing follows a variant of running a loop with:

        final f = pool.withResource(() => /* do something... */);
        futures.add(f);

And then doing Future.wait(futures).

You existing loops won't exit early. If we want to change semantics in every place where we do a loop like this, I'm down for hearing the arguments. But I think it might be different on a case by case basis.

In package:async I'm leaning towards onError which defaults to a function that throws the error.

See:
https://github.com/dart-lang/async/blob/45c6b66fdd6e294883b2353a0c513fb139aa89f1/lib/src/stream_extensions.dart#L99

But then all our invocations would be like:

    await page!.items.boundedForEach(concurrency ?? 1, (entry) async {
      final deleted = await deleteFromBucket(bucket, entry.name);
      if (deleted) count++;
    }, onError: (e, st) => null);

Which gets formatted something like:

    await page!.items.boundedForEach(
      concurrency ?? 1,
      (entry) async {
        final deleted = await deleteFromBucket(bucket, entry.name);
        if (deleted) count++;
      },
      onError: (e, st) => null,
    );

with trailing comma.

Where you currently have:

    await page!.items.boundedForEach(concurrency ?? 1, (entry) async {
      final deleted = await deleteFromBucket(bucket, entry.name);
      if (deleted) count++;
    });

But this assumes we default to not exit early on errors (assuming we want to preserve semantics).

@isoos
Copy link
Collaborator Author

isoos commented Oct 18, 2023

I think we could close this in favor of dart-lang/async#249

@jonasfj
Copy link
Member

jonasfj commented Oct 18, 2023

@isoos yes, but feel free to just copy paste from dart-lang/async#249, it might be a while before it lands :D

@isoos isoos closed this Oct 23, 2023
@isoos isoos deleted the pooling branch October 23, 2023 08:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants