-
Notifications
You must be signed in to change notification settings - Fork 735
/
Timeout.cs
121 lines (94 loc) · 4.64 KB
/
Timeout.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace System.Linq
{
public static partial class AsyncEnumerableEx
{
public static IAsyncEnumerable<TSource> Timeout<TSource>(this IAsyncEnumerable<TSource> source, TimeSpan timeout)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
var num = (long)timeout.TotalMilliseconds;
if (num < -1L || num > int.MaxValue)
throw Error.ArgumentOutOfRange(nameof(timeout));
return new TimeoutAsyncIterator<TSource>(source, timeout);
}
private sealed class TimeoutAsyncIterator<TSource> : AsyncIterator<TSource>
{
private readonly IAsyncEnumerable<TSource> _source;
private readonly TimeSpan _timeout;
private IAsyncEnumerator<TSource>? _enumerator;
private Task? _loserTask;
public TimeoutAsyncIterator(IAsyncEnumerable<TSource> source, TimeSpan timeout)
{
_source = source;
_timeout = timeout;
}
public override AsyncIteratorBase<TSource> Clone()
{
return new TimeoutAsyncIterator<TSource>(_source, _timeout);
}
public override async ValueTask DisposeAsync()
{
if (_loserTask != null)
{
await _loserTask.ConfigureAwait(false);
_loserTask = null;
_enumerator = null;
}
else if (_enumerator != null)
{
await _enumerator.DisposeAsync().ConfigureAwait(false);
_enumerator = null;
}
await base.DisposeAsync().ConfigureAwait(false);
}
protected override async ValueTask<bool> MoveNextCore()
{
switch (_state)
{
case AsyncIteratorState.Allocated:
_enumerator = _source.GetAsyncEnumerator(_cancellationToken);
_state = AsyncIteratorState.Iterating;
goto case AsyncIteratorState.Iterating;
case AsyncIteratorState.Iterating:
var moveNext = _enumerator!.MoveNextAsync();
if (!moveNext.IsCompleted)
{
using var delayCts = new CancellationTokenSource();
var delay = Task.Delay(_timeout, delayCts.Token);
var next = moveNext.AsTask();
var winner = await Task.WhenAny(next, delay).ConfigureAwait(false);
if (winner == delay)
{
// NB: We still have to wait for the MoveNextAsync operation to complete before we can
// dispose _enumerator. The resulting task will be used by DisposeAsync. Also note
// that throwing an exception here causes a call to DisposeAsync, where we pick up
// the task prepared below.
// NB: Any exception reported by a timed out MoveNextAsync operation won't be reported
// to the caller, but the task's exception is not marked as observed, so unhandled
// exception handlers can still observe the exception.
// REVIEW: Should exceptions reported by a timed out MoveNextAsync operation come out
// when attempting to call DisposeAsync?
_loserTask = next.ContinueWith((_, state) => ((IAsyncDisposable)state!).DisposeAsync().AsTask(), _enumerator);
throw new TimeoutException();
}
delayCts.Cancel();
}
if (await moveNext.ConfigureAwait(false))
{
_current = _enumerator.Current;
return true;
}
break;
}
await DisposeAsync().ConfigureAwait(false);
return false;
}
}
}
}