Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Umbrella] Work items and test plan for async streams #24037

Closed
jcouv opened this issue Jan 4, 2018 · 4 comments
Closed

[Umbrella] Work items and test plan for async streams #24037

jcouv opened this issue Jan 4, 2018 · 4 comments

Comments

@jcouv
Copy link
Member

jcouv commented Jan 4, 2018

Spec: https://github.com/dotnet/csharplang/blob/master/proposals/csharp-8.0/async-streams.md
Championed issue: dotnet/csharplang#43

Notes on cancellation token and [EnumeratorCancellation]: http://blog.monstuff.com/archives/2019/03/async-enumerables-with-cancellation.html

FAQ and known issues


Async-iterator methods

Async using and foreach

Productivity (code fixers/refactorings/etc):

LDM open issues:

  • async LINQ
    • Parsing issue with return from e in async-collection select await e + 1; // await isn't a keyword
    • how does above scenario and return from e in async-collection select e + 1; lower into LINQ APIs? (the one with await involves Task<int> and the other one involves int directly) How many overloads of Select do we need?
  • Should there be some helper method to convert from IEnumerable to IAsyncEnumerable, or from Task to IAsyncEnumerable?
  • Should the generate token checks be in GetAsyncEnumerator or in MoveNextAsync? (if we do, we need to store the token and maybe dispose it too) (answer: no)
  • Revisit blocking off a word (either in parameter list, like params, or for variable name, like value) for token (answer: we're not going to use a keyword)
  • pattern-based await using should recognize a DisposeAsync method that returns a task-like (or only ValueTask)? (not applicable because no ref structs in async methods)
  • Should pattern-based foreach recognize ... GetAsyncEnumerator() (without CancellationToken)? (yes, LDM 1/9)
  • What attributes should be emitted on async-iterator state machines? (answer: AsyncIteratorStateMachineAttribute)
  • cancellation of async-streams (both as consumer and producer) (see LDM 11/28)
  • confirm syntax for async foreach
  • Should we disallow struct async enumerator? (no, same as regular async, see TestWithPattern_WithStruct_MoveNextAsyncReturnsTask)
  • Should DisposeAsync return a non-generic ValueTask, since there is now one? Or stick with Task?
  • Extension methods for GetAsyncEnumerator, WaitForNextAsync, and TryGetNext do not contribute. This mirrors behavior for regular foreach. But I'd like to confirm. (answer: this is probably something we want to support. Let's queue that for later in the implementation)
  • Should the pattern for async enumerators also recognize when a task-like is returned instead of Task<bool>? (answer: yes)
  • I think we'll need to block dynamic since there is no async counterpart to the non-generic IEnumerable that would convert dynamic to. (answer: seems ok)
  • Do we need async keyword on async iterator methods? I assume yes.
  • Since not enumerable lambdas, I assume the same for async-enumerable. (answer: correct. No async iterator lambda)
  • async-iterator without yield or await, should warn? (answer: without yield it's not recognized as an iterator, warn when no await)
  • I suspect we'll need to declare the loop variable differently: calling TryGetNext first, then checking success and only then dealing with conversions and deconstructions.

Championed issue: dotnet/csharplang#43 (includes LDM notes)

Test ideas for async foreach:

  • Verify that async-dispose doesn't have a similar bug with struct resource

Test ideas for async using:

  • Look up Lippert's blog on using with struct or generic type T
  • Does the pattern or the interface get picked up first? Is it observable? (maybe if the pattern allows task-like returning method)

Test ideas for async iterators:

  • From Andy: we should have at least one test that runs using a non-trivial sync context.
  • From Andy: would it be useful to emit asserts for invalid/unexpected states for at least a little while? We could do it only for debug codegen.
  • Test with yield or await in try/catch/finally
  • More tests with exception thrown in async-iterator
  • There is a case in GetIteratorElementType with IsDirectlyInIterator that relates to speculation, needs testing
  • yield break disallowed in finally and top-level script (see BindYieldBreakStatement); same for yield return (see BindYieldReturnStatement)
  • binding for yield return (BindYieldReturnStatement) validates escape rules, needs testing
  • test yield in async lambda (still error)
  • test with IAsyncEnumerable<dynamic>
  • other tests with dynamic?
  • test should cover both case with AwaitOnCompleted and AwaitUnsafeOnCompleted
  • test async IAsyncEnumerable<int> M() { return TaskLike(); }
  • Can we avoid making IAsyncEnumerable<T> special from the start? Making mark it with an attribute like we did for task-like?
  • Do some manual validation on debugging scenarios, including with exceptions (thrown after yield and after await).
  • Test with one or both or the threadID APIs missing.

BCL (Core)

BCL (mono)

BCL (package)


References:

@jcouv jcouv added the New Feature - Async Streams Async Streams label Jan 4, 2018
@jcouv jcouv added this to the 16.0 milestone Jan 4, 2018
@jcouv jcouv self-assigned this Jan 4, 2018
@jcouv jcouv changed the title [Umbrella] Work items for async streams [Umbrella] Work items and test plan for async streams Apr 21, 2018
@jcouv
Copy link
Member Author

jcouv commented Apr 26, 2018

Notes from chat with Stephen:

The promise should not just be a TaskCompletionSource. It should be possible to make a re-usable ValueTask<bool> and completion machinery, thus avoiding allocating entirely.
See ManualResetValueTaskSource.cs in corefx. It should be based on IValueTaskSource which should ship in .NET Core 2.1 and down-level (.NET Standard 1.1 or maybe 1.0).

WaitForNextAsync (or MoveNextAsync) would return a ValueTask<bool>. ValueTask<bool> can be backed by a bool, a Task<bool>, or an IValueTaskSource<bool>. So to return the ValueTask<bool> wrapping the Unpronounceable, the Unpronounceable would need to implement IValueTaskSource<bool> so that it can be the backing implementation for that ValueTask<bool>.

It is possible that .NET Core 2.2 would introduce some reusable type(s) for abstracting this sort of stuff. Then we may not need to inline all the extra fields and low-level logic.
As long as the compiler-generated code has a way to tell whether there is no promise, then we’re good. That seems to be possible.
We need to discuss this type dependency.

We’ll have a base type, which common machinery (as both our prototypes did). We should discuss how this type will arrive in the compilation (always generate, or generate if not referenced, or always use reference?).
If the API can be referenced, then we need to lock API. This question also affects above API dependency.

For ConfigureAwait, we could have an extension ConfiguredValueTaskAwaitable<bool> ConfigureAwait(this IValueTaskSource), and ConfiguredValueTaskAwaitable<T> is a task-like.
So we’d need the pattern-based async-foreach to recognize WaitForNextAsync methods that return a task-like.
Need to confirm with LDM.

For CancellationToken, we could just treat the token as a regular local, captured and available to use in the user code.
There may have been some pushback on this (Bart, maybe Lucien). Not clear what the problem is (composability) or what the alternative would be (state machine intrinsics?).

For GetAsyncEnumerable(), we can use the same logic as currently used in async unpronounceable type.
But should read this thread (https://github.com/dotnet/corefx/issues/3481) and confirm with Vlad.
The thread check is meant to emulate an interlocked exchange. We should confirm this is indeed better than interlocked.

We discussed trade-off between WaitForNextAsync/TryGetNext vs. MoveNextAsync/Current patterns.
It just comes down to perf (number of interface calls) vs. pattern complexity (for implementing LINQ methods like Group, etc that aren’t straight foreach loops, for example).

@jcouv
Copy link
Member Author

jcouv commented Oct 2, 2018

Notes on testing the feature in dev hive:

  • add a reference to NuGet package System.Threading.Tasks.Extensions (I've used 4.5.1)
  • add the following code to provide types (including IAsyncEnumerable<T> and ManualResetValueTaskSourceLogic<T>:
namespace System.Collections.Generic
{
    public interface IAsyncEnumerable<out T>
    {
        IAsyncEnumerator<T> GetAsyncEnumerator();
    }

    public interface IAsyncEnumerator<out T> : System.IAsyncDisposable
    {
        System.Threading.Tasks.ValueTask<bool> WaitForNextAsync();
        T TryGetNext(out bool success);
    }
}
namespace System
{
    public interface IAsyncDisposable
    {
        System.Threading.Tasks.ValueTask DisposeAsync();
    }
}

namespace System.Runtime.CompilerServices
{
    public interface IStrongBox<T>
    {
        ref T Value { get; }
    }
}

namespace System.Threading.Tasks
{
    using System.Runtime.CompilerServices;
    using System.Runtime.ExceptionServices;
    using System.Threading.Tasks.Sources;

    public struct ManualResetValueTaskSourceLogic<TResult>
    {
        private static readonly Action<object> s_sentinel = new Action<object>(s => throw new InvalidOperationException());

        private readonly IStrongBox<ManualResetValueTaskSourceLogic<TResult>> _parent;
        private Action<object> _continuation;
        private object _continuationState;
        private object _capturedContext;
        private ExecutionContext _executionContext;
        private bool _completed;
        private TResult _result;
        private ExceptionDispatchInfo _error;
        private short _version;

        public ManualResetValueTaskSourceLogic(IStrongBox<ManualResetValueTaskSourceLogic<TResult>> parent)
        {
            _parent = parent ?? throw new ArgumentNullException(nameof(parent));
            _continuation = null;
            _continuationState = null;
            _capturedContext = null;
            _executionContext = null;
            _completed = false;
            _result = default;
            _error = null;
            _version = 0;
        }

        public short Version => _version;

        private void ValidateToken(short token)
        {
            if (token != _version)
            {
                throw new InvalidOperationException();
            }
        }

        public ValueTaskSourceStatus GetStatus(short token)
        {
            ValidateToken(token);

            return
                !_completed ? ValueTaskSourceStatus.Pending :
                _error == null ? ValueTaskSourceStatus.Succeeded :
                _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled :
                ValueTaskSourceStatus.Faulted;
        }

        public TResult GetResult(short token)
        {
            ValidateToken(token);

            if (!_completed)
            {
                throw new InvalidOperationException();
            }

            _error?.Throw();
            return _result;
        }

        public void Reset()
        {
            _version++;

            _completed = false;
            _continuation = null;
            _continuationState = null;
            _result = default;
            _error = null;
            _executionContext = null;
            _capturedContext = null;
        }

        public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
        {
            if (continuation == null)
            {
                throw new ArgumentNullException(nameof(continuation));
            }
            ValidateToken(token);

            if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0)
            {
                _executionContext = ExecutionContext.Capture();
            }

            if ((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0)
            {
                SynchronizationContext sc = SynchronizationContext.Current;
                if (sc != null && sc.GetType() != typeof(SynchronizationContext))
                {
                    _capturedContext = sc;
                }
                else
                {
                    TaskScheduler ts = TaskScheduler.Current;
                    if (ts != TaskScheduler.Default)
                    {
                        _capturedContext = ts;
                    }
                }
            }

            _continuationState = state;
            if (Interlocked.CompareExchange(ref _continuation, continuation, null) != null)
            {
                _executionContext = null;

                object cc = _capturedContext;
                _capturedContext = null;

                switch (cc)
                {
                    case null:
                        Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
                        break;

                    case SynchronizationContext sc:
                        sc.Post(s =>
                        {
                            var tuple = (Tuple<Action<object>, object>)s;
                            tuple.Item1(tuple.Item2);
                        }, Tuple.Create(continuation, state));
                        break;

                    case TaskScheduler ts:
                        Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
                        break;
                }
            }
        }

        public void SetResult(TResult result)
        {
            _result = result;
            SignalCompletion();
        }

        public void SetException(Exception error)
        {
            _error = ExceptionDispatchInfo.Capture(error);
            SignalCompletion();
        }

        private void SignalCompletion()
        {
            if (_completed)
            {
                throw new InvalidOperationException();
            }
            _completed = true;

            if (Interlocked.CompareExchange(ref _continuation, s_sentinel, null) != null)
            {
                if (_executionContext != null)
                {
                    ExecutionContext.Run(
                        _executionContext,
                        s => ((IStrongBox<ManualResetValueTaskSourceLogic<TResult>>)s).Value.InvokeContinuation(),
                        _parent ?? throw new InvalidOperationException());
                }
                else
                {
                    InvokeContinuation();
                }
            }
        }

        private void InvokeContinuation()
        {
            object cc = _capturedContext;
            _capturedContext = null;

            switch (cc)
            {
                case null:
                    _continuation(_continuationState);
                    break;

                case SynchronizationContext sc:
                    sc.Post(s =>
                    {
                        ref ManualResetValueTaskSourceLogic<TResult> logicRef = ref ((IStrongBox<ManualResetValueTaskSourceLogic<TResult>>)s).Value;
                        logicRef._continuation(logicRef._continuationState);
                    }, _parent ?? throw new InvalidOperationException());
                    break;

                case TaskScheduler ts:
                    Task.Factory.StartNew(_continuation, _continuationState, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
                    break;
            }
        }
    }
}

@jcouv
Copy link
Member Author

jcouv commented Nov 29, 2018

Some notes on WIP design:

For DisposeAsync implementation, we need to be able to jump to a specific finally corresponding to the current state and continue execution from there, but executing only the code in finally clauses.
We call this alternative way of executing the state machine "dispose mode".

Update:
It turns out that we already have routing logic that allows us to restore the execution at a certain label, even within try statements.
So the design below can be simplified. We only need to:

  1. add logic to jump after a yield return to jump the enclosing finally when in dispose mode,
  2. add logic to jump after a finally clause to the parent's finally-entry label when in dispose mode.

To design is comprised of four parts:

  1. We add labels just before try statements (try-entry)
  2. We add routing logic at the beginning of try blocks to jump to those try-entry labels when in dispose mode.
  3. We add labels at the end of try blocks (finally-entry)
    (Since IL does not allow to jump in or out of try/finally/catch blocks)
  4. We add logic to jump after a finally clause to the parent's finally-entry label when in dispose mode.

For:

 try
 {
     try { ... }
     finally { }
     ...
     try { ... }
     finally { }
     ...
 }
 finally { }

We adjust the lowering logic like this:

 tryEntry1:
 try
 {
     if (disposeMode) { /* route based on state to tryEntry2 or tryEntry3 */ }

     tryEntry2: /*part 1*/
     try
     {
         if (disposeMode) goto finallyEntry2; // simplified routing /*part 2*/
         ...
         finallyEntry2: /*part 3*/
     }
     finally { }
     if (disposeMode) goto finallyEntry1; /*part 4*/

     ...

     tryEntry3:
     try
     {
         if (disposeMode) goto finallyEntry3; // simplified routing
         ...
         finallyEntry3:
     }
     finally { }
     if (disposeMode) goto finallyEntry1;

     ...

     finallyEntry1:
 }
 finally { }

Then DisposeAsync just needs to set composeMode to true, launch the state machine (via builder) and return the builder's Task.

Note: only try statements that have a finally and contain a yield return need to handle dispose mode,
since disposal should only be triggered when the machine is paused and control was given back to the caller.

@jcouv
Copy link
Member Author

jcouv commented Sep 19, 2019

Individual issues have been open for follow-ups (optimizations and improved diagnostics)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Done Umbrellas
Development

No branches or pull requests

5 participants