-
Notifications
You must be signed in to change notification settings - Fork 3
/
PipelineExtensions.cs
99 lines (86 loc) · 3.5 KB
/
PipelineExtensions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
using System;
using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
static class PipelinesExtensions
{
public static void Explain(this Pipelines runnable, TextWriter writer)
{
writer.WriteLine(@"
- All buffer management is delegated to the `PipeReader`/`PipeWriter` implementations.`
- Besides handling the memory management, the other core pipelines feature is the ability to peek at data in the Pipe without actually consuming it.
- `FlushAsync` provides back pressure and flow control. PipeWriter.FlushAsync “blocks” when the amount of data in the Pipe crosses PauseWriterThreshold and “unblocks” when it becomes lower than ResumeWriterThreshold.
- `PipeScheduler` gives fine grained control over scheduling the IO.
");
}
public static void ProcessLine(this Pipelines runnable, Socket socket, in ReadOnlySequence<byte> buffer)
{
Console.Write($"[{socket.RemoteEndPoint}]: ");
foreach (var segment in buffer)
{
Console.Write(Encoding.UTF8.GetString(segment.Span));
}
Console.WriteLine();
}
public static void ReadUntilEOLAndOutputToConsole(this Pipelines runnable, Socket socket, ReadOnlySequence<byte> buffer)
{
SequencePosition? position = null;
do
{
// Find the EOL
position = buffer.PositionOf((byte)'\n');
if (position != null)
{
var line = buffer.Slice(0, position.Value);
runnable.ProcessLine(socket, line);
// This is equivalent to position + 1
var next = buffer.GetPosition(1, position.Value);
// Skip what we've already processed including \n
buffer = buffer.Slice(next);
}
}
while (position != null);
}
public static async Task Send(this Pipelines runnable, CancellationTokenSource tokenSource)
{
var clientSocket = new Socket(SocketType.Stream, ProtocolType.Tcp);
await clientSocket.ConnectAsync(new IPEndPoint(IPAddress.Loopback, 8087));
Console.WriteLine("Connecting to port 8087");
string line;
while ((line = Console.ReadLine()) != "exit")
{
var buffer = Encoding.ASCII.GetBytes(line.Replace(Environment.NewLine, string.Empty) + Environment.NewLine);
await clientSocket.SendAsync(new ArraySegment<byte>(buffer), SocketFlags.None);
}
tokenSource.Cancel();
Console.WriteLine("Send done");
}
public static async Task Receive(this Pipelines runnable, Pipe pipe, CancellationToken token)
{
var listenSocket = new Socket(SocketType.Stream, ProtocolType.Tcp);
listenSocket.Bind(new IPEndPoint(IPAddress.Loopback, 8087));
Console.WriteLine("Listening on port 8087");
listenSocket.Listen(120);
using (token.Register(() => listenSocket.Close()))
{
while (!token.IsCancellationRequested)
{
Socket socket;
try
{
socket = await listenSocket.AcceptAsync();
}
catch (SocketException)
{
return;
}
_ = runnable.ProcessLinesAsync(socket, pipe, token);
}
}
}
}