/
NodeEndpointOutOfProcBase.cs
642 lines (555 loc) · 26.5 KB
/
NodeEndpointOutOfProcBase.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
#if CLR2COMPATIBILITY
using Microsoft.Build.Shared.Concurrent;
#else
using System.Collections.Concurrent;
#endif
using System.IO;
using System.IO.Pipes;
using System.Threading;
using Microsoft.Build.Internal;
using Microsoft.Build.Shared;
#if FEATURE_SECURITY_PERMISSIONS || FEATURE_PIPE_SECURITY
using System.Security.AccessControl;
#endif
#if FEATURE_PIPE_SECURITY && FEATURE_NAMED_PIPE_SECURITY_CONSTRUCTOR
using System.Security.Principal;
#endif
#if !FEATURE_APM
using System.Threading.Tasks;
#endif
namespace Microsoft.Build.BackEnd
{
/// <summary>
/// This is an implementation of INodeEndpoint for the out-of-proc nodes. It acts only as a client.
/// </summary>
internal abstract class NodeEndpointOutOfProcBase : INodeEndpoint
{
#region Private Data
#if NETCOREAPP2_1 || MONO
/// <summary>
/// The amount of time to wait for the client to connect to the host.
/// </summary>
private const int ClientConnectTimeout = 60000;
#endif // NETCOREAPP2_1 || MONO
/// <summary>
/// The size of the buffers to use for named pipes
/// </summary>
private const int PipeBufferSize = 131072;
/// <summary>
/// Flag indicating if we should debug communications or not.
/// </summary>
private bool _debugCommunications = false;
/// <summary>
/// The current communication status of the node.
/// </summary>
private LinkStatus _status;
/// <summary>
/// The pipe client used by the nodes.
/// </summary>
private NamedPipeServerStream _pipeServer;
// The following private data fields are used only when the endpoint is in ASYNCHRONOUS mode.
/// <summary>
/// Object used as a lock source for the async data
/// </summary>
private object _asyncDataMonitor;
/// <summary>
/// Set when a packet is available in the packet queue
/// </summary>
private AutoResetEvent _packetAvailable;
/// <summary>
/// Set when the asynchronous packet pump should terminate
/// </summary>
private AutoResetEvent _terminatePacketPump;
/// <summary>
/// The thread which runs the asynchronous packet pump
/// </summary>
private Thread _packetPump;
/// <summary>
/// The factory used to create and route packets.
/// </summary>
private INodePacketFactory _packetFactory;
/// <summary>
/// The asynchronous packet queue.
/// </summary>
/// <remarks>
/// Operations on this queue must be synchronized since it is accessible by multiple threads.
/// Use a lock on the packetQueue itself.
/// </remarks>
private ConcurrentQueue<INodePacket> _packetQueue;
/// <summary>
/// Per-node shared read buffer.
/// </summary>
private SharedReadBuffer _sharedReadBuffer;
#endregion
#region INodeEndpoint Events
/// <summary>
/// Raised when the link status has changed.
/// </summary>
public event LinkStatusChangedDelegate OnLinkStatusChanged;
#endregion
#region INodeEndpoint Properties
/// <summary>
/// Returns the link status of this node.
/// </summary>
public LinkStatus LinkStatus
{
get { return _status; }
}
#endregion
#region Properties
#endregion
#region INodeEndpoint Methods
/// <summary>
/// Causes this endpoint to wait for the remote endpoint to connect
/// </summary>
/// <param name="factory">The factory used to create packets.</param>
public void Listen(INodePacketFactory factory)
{
ErrorUtilities.VerifyThrow(_status == LinkStatus.Inactive, "Link not inactive. Status is {0}", _status);
ErrorUtilities.VerifyThrowArgumentNull(factory, nameof(factory));
_packetFactory = factory;
InitializeAsyncPacketThread();
}
/// <summary>
/// Causes this node to connect to the matched endpoint.
/// </summary>
/// <param name="factory">The factory used to create packets.</param>
public void Connect(INodePacketFactory factory)
{
ErrorUtilities.ThrowInternalError("Connect() not valid on the out of proc endpoint.");
}
/// <summary>
/// Shuts down the link
/// </summary>
public void Disconnect()
{
InternalDisconnect();
}
/// <summary>
/// Sends data to the peer endpoint.
/// </summary>
/// <param name="packet">The packet to send.</param>
public void SendData(INodePacket packet)
{
// PERF: Set up a priority system so logging packets are sent only when all other packet types have been sent.
if (_status == LinkStatus.Active)
{
EnqueuePacket(packet);
}
}
#endregion
#region Construction
/// <summary>
/// Instantiates an endpoint to act as a client
/// </summary>
/// <param name="pipeName">The name of the pipe to which we should connect.</param>
internal void InternalConstruct(string pipeName)
{
ErrorUtilities.VerifyThrowArgumentLength(pipeName, nameof(pipeName));
_debugCommunications = (Environment.GetEnvironmentVariable("MSBUILDDEBUGCOMM") == "1");
_status = LinkStatus.Inactive;
_asyncDataMonitor = new object();
_sharedReadBuffer = InterningBinaryReader.CreateSharedBuffer();
#if FEATURE_PIPE_SECURITY && FEATURE_NAMED_PIPE_SECURITY_CONSTRUCTOR
if (!NativeMethodsShared.IsMono)
{
SecurityIdentifier identifier = WindowsIdentity.GetCurrent().Owner;
PipeSecurity security = new PipeSecurity();
// Restrict access to just this account. We set the owner specifically here, and on the
// pipe client side they will check the owner against this one - they must have identical
// SIDs or the client will reject this server. This is used to avoid attacks where a
// hacked server creates a less restricted pipe in an attempt to lure us into using it and
// then sending build requests to the real pipe client (which is the MSBuild Build Manager.)
PipeAccessRule rule = new PipeAccessRule(identifier, PipeAccessRights.ReadWrite, AccessControlType.Allow);
security.AddAccessRule(rule);
security.SetOwner(identifier);
_pipeServer = new NamedPipeServerStream
(
pipeName,
PipeDirection.InOut,
1, // Only allow one connection at a time.
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous | PipeOptions.WriteThrough,
PipeBufferSize, // Default input buffer
PipeBufferSize, // Default output buffer
security,
HandleInheritability.None
);
}
else
#endif
{
_pipeServer = new NamedPipeServerStream
(
pipeName,
PipeDirection.InOut,
1, // Only allow one connection at a time.
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous | PipeOptions.WriteThrough,
PipeBufferSize, // Default input buffer
PipeBufferSize // Default output buffer
);
}
}
#endregion
/// <summary>
/// Returns the host handshake for this node endpoint
/// </summary>
protected abstract Handshake GetHandshake();
/// <summary>
/// Updates the current link status if it has changed and notifies any registered delegates.
/// </summary>
/// <param name="newStatus">The status the node should now be in.</param>
protected void ChangeLinkStatus(LinkStatus newStatus)
{
ErrorUtilities.VerifyThrow(_status != newStatus, "Attempting to change status to existing status {0}.", _status);
CommunicationsUtilities.Trace("Changing link status from {0} to {1}", _status.ToString(), newStatus.ToString());
_status = newStatus;
RaiseLinkStatusChanged(_status);
}
/// <summary>
/// Invokes the OnLinkStatusChanged event in a thread-safe manner.
/// </summary>
/// <param name="newStatus">The new status of the endpoint link.</param>
private void RaiseLinkStatusChanged(LinkStatus newStatus)
{
OnLinkStatusChanged?.Invoke(this, newStatus);
}
#region Private Methods
/// <summary>
/// This does the actual work of changing the status and shutting down any threads we may have for
/// disconnection.
/// </summary>
private void InternalDisconnect()
{
ErrorUtilities.VerifyThrow(_packetPump.ManagedThreadId != Thread.CurrentThread.ManagedThreadId, "Can't join on the same thread.");
_terminatePacketPump.Set();
_packetPump.Join();
#if CLR2COMPATIBILITY
_terminatePacketPump.Close();
#else
_terminatePacketPump.Dispose();
#endif
_pipeServer.Dispose();
_packetPump = null;
ChangeLinkStatus(LinkStatus.Inactive);
}
#region Asynchronous Mode Methods
/// <summary>
/// Adds a packet to the packet queue when asynchronous mode is enabled.
/// </summary>
/// <param name="packet">The packet to be transmitted.</param>
private void EnqueuePacket(INodePacket packet)
{
ErrorUtilities.VerifyThrowArgumentNull(packet, nameof(packet));
ErrorUtilities.VerifyThrow(_packetQueue != null, "packetQueue is null");
ErrorUtilities.VerifyThrow(_packetAvailable != null, "packetAvailable is null");
_packetQueue.Enqueue(packet);
_packetAvailable.Set();
}
/// <summary>
/// Initializes the packet pump thread and the supporting events as well as the packet queue.
/// </summary>
private void InitializeAsyncPacketThread()
{
lock (_asyncDataMonitor)
{
_packetPump = new Thread(PacketPumpProc);
_packetPump.IsBackground = true;
_packetPump.Name = "OutOfProc Endpoint Packet Pump";
_packetAvailable = new AutoResetEvent(false);
_terminatePacketPump = new AutoResetEvent(false);
_packetQueue = new ConcurrentQueue<INodePacket>();
_packetPump.Start();
}
}
/// <summary>
/// This method handles the asynchronous message pump. It waits for messages to show up on the queue
/// and calls FireDataAvailable for each such packet. It will terminate when the terminate event is
/// set.
/// </summary>
private void PacketPumpProc()
{
NamedPipeServerStream localPipeServer = _pipeServer;
AutoResetEvent localPacketAvailable = _packetAvailable;
AutoResetEvent localTerminatePacketPump = _terminatePacketPump;
ConcurrentQueue<INodePacket> localPacketQueue = _packetQueue;
DateTime originalWaitStartTime = DateTime.UtcNow;
bool gotValidConnection = false;
while (!gotValidConnection)
{
gotValidConnection = true;
DateTime restartWaitTime = DateTime.UtcNow;
// We only wait to wait the difference between now and the last original start time, in case we have multiple hosts attempting
// to attach. This prevents each attempt from resetting the timer.
TimeSpan usedWaitTime = restartWaitTime - originalWaitStartTime;
int waitTimeRemaining = Math.Max(0, CommunicationsUtilities.NodeConnectionTimeout - (int)usedWaitTime.TotalMilliseconds);
try
{
// Wait for a connection
#if FEATURE_APM
IAsyncResult resultForConnection = localPipeServer.BeginWaitForConnection(null, null);
CommunicationsUtilities.Trace("Waiting for connection {0} ms...", waitTimeRemaining);
bool connected = resultForConnection.AsyncWaitHandle.WaitOne(waitTimeRemaining, false);
#else
Task connectionTask = localPipeServer.WaitForConnectionAsync();
CommunicationsUtilities.Trace("Waiting for connection {0} ms...", waitTimeRemaining);
bool connected = connectionTask.Wait(waitTimeRemaining);
#endif
if (!connected)
{
CommunicationsUtilities.Trace("Connection timed out waiting a host to contact us. Exiting comm thread.");
ChangeLinkStatus(LinkStatus.ConnectionFailed);
return;
}
CommunicationsUtilities.Trace("Parent started connecting. Reading handshake from parent");
#if FEATURE_APM
localPipeServer.EndWaitForConnection(resultForConnection);
#endif
// The handshake protocol is a series of int exchanges. The host sends us a each component, and we
// verify it. Afterwards, the host sends an "End of Handshake" signal, to which we respond in kind.
// Once the handshake is complete, both sides can be assured the other is ready to accept data.
Handshake handshake = GetHandshake();
try
{
int[] handshakeComponents = handshake.RetrieveHandshakeComponents();
for (int i = 0; i < handshakeComponents.Length; i++)
{
int handshakePart = _pipeServer.ReadIntForHandshake(i == 0 ? (byte?)CommunicationsUtilities.handshakeVersion : null /* this will disconnect a < 16.8 host; it expects leading 00 or F5 or 06. 0x00 is a wildcard */
#if NETCOREAPP2_1 || MONO
, ClientConnectTimeout /* wait a long time for the handshake from this side */
#endif
);
if (handshakePart != handshakeComponents[i])
{
CommunicationsUtilities.Trace("Handshake failed. Received {0} from host not {1}. Probably the host is a different MSBuild build.", handshakePart, handshakeComponents[i]);
_pipeServer.WriteIntForHandshake(i + 1);
gotValidConnection = false;
break;
}
}
if (gotValidConnection)
{
// To ensure that our handshake and theirs have the same number of bytes, receive and send a magic number indicating EOS.
#if NETCOREAPP2_1 || MONO
_pipeServer.ReadEndOfHandshakeSignal(false, ClientConnectTimeout); /* wait a long time for the handshake from this side */
#else
_pipeServer.ReadEndOfHandshakeSignal(false);
#endif
CommunicationsUtilities.Trace("Successfully connected to parent.");
_pipeServer.WriteEndOfHandshakeSignal();
#if FEATURE_SECURITY_PERMISSIONS
// We will only talk to a host that was started by the same user as us. Even though the pipe access is set to only allow this user, we want to ensure they
// haven't attempted to change those permissions out from under us. This ensures that the only way they can truly gain access is to be impersonating the
// user we were started by.
WindowsIdentity currentIdentity = WindowsIdentity.GetCurrent();
WindowsIdentity clientIdentity = null;
localPipeServer.RunAsClient(delegate () { clientIdentity = WindowsIdentity.GetCurrent(true); });
if (clientIdentity == null || !String.Equals(clientIdentity.Name, currentIdentity.Name, StringComparison.OrdinalIgnoreCase))
{
CommunicationsUtilities.Trace("Handshake failed. Host user is {0} but we were created by {1}.", (clientIdentity == null) ? "<unknown>" : clientIdentity.Name, currentIdentity.Name);
gotValidConnection = false;
continue;
}
#endif
}
}
catch (IOException e)
{
// We will get here when:
// 1. The host (OOP main node) connects to us, it immediately checks for user privileges
// and if they don't match it disconnects immediately leaving us still trying to read the blank handshake
// 2. The host is too old sending us bits we automatically reject in the handshake
// 3. We expected to read the EndOfHandshake signal, but we received something else
CommunicationsUtilities.Trace("Client connection failed but we will wait for another connection. Exception: {0}", e.Message);
gotValidConnection = false;
}
catch (InvalidOperationException)
{
gotValidConnection = false;
}
if (!gotValidConnection)
{
if (localPipeServer.IsConnected)
{
localPipeServer.Disconnect();
}
continue;
}
ChangeLinkStatus(LinkStatus.Active);
}
catch (Exception e)
{
if (ExceptionHandling.IsCriticalException(e))
{
throw;
}
CommunicationsUtilities.Trace("Client connection failed. Exiting comm thread. {0}", e);
if (localPipeServer.IsConnected)
{
localPipeServer.Disconnect();
}
ExceptionHandling.DumpExceptionToFile(e);
ChangeLinkStatus(LinkStatus.Failed);
return;
}
}
RunReadLoop(
new BufferedReadStream(_pipeServer),
_pipeServer,
localPacketQueue, localPacketAvailable, localTerminatePacketPump);
CommunicationsUtilities.Trace("Ending read loop");
try
{
if (localPipeServer.IsConnected)
{
#if NETCOREAPP // OperatingSystem.IsWindows() is new in .NET 5.0
if (OperatingSystem.IsWindows())
#endif
{
localPipeServer.WaitForPipeDrain();
}
localPipeServer.Disconnect();
}
}
catch (Exception)
{
// We don't really care if Disconnect somehow fails, but it gives us a chance to do the right thing.
}
}
private void RunReadLoop(Stream localReadPipe, Stream localWritePipe,
ConcurrentQueue<INodePacket> localPacketQueue, AutoResetEvent localPacketAvailable, AutoResetEvent localTerminatePacketPump)
{
// Ordering of the wait handles is important. The first signalled wait handle in the array
// will be returned by WaitAny if multiple wait handles are signalled. We prefer to have the
// terminate event triggered so that we cannot get into a situation where packets are being
// spammed to the endpoint and it never gets an opportunity to shutdown.
CommunicationsUtilities.Trace("Entering read loop.");
byte[] headerByte = new byte[5];
#if FEATURE_APM
IAsyncResult result = localReadPipe.BeginRead(headerByte, 0, headerByte.Length, null, null);
#else
Task<int> readTask = CommunicationsUtilities.ReadAsync(localReadPipe, headerByte, headerByte.Length);
#endif
bool exitLoop = false;
do
{
// Ordering is important. We want packetAvailable to supercede terminate otherwise we will not properly wait for all
// packets to be sent by other threads which are shutting down, such as the logging thread.
WaitHandle[] handles = new WaitHandle[] {
#if FEATURE_APM
result.AsyncWaitHandle,
#else
((IAsyncResult)readTask).AsyncWaitHandle,
#endif
localPacketAvailable, localTerminatePacketPump };
int waitId = WaitHandle.WaitAny(handles);
switch (waitId)
{
case 0:
{
int bytesRead = 0;
try
{
#if FEATURE_APM
bytesRead = localReadPipe.EndRead(result);
#else
bytesRead = readTask.Result;
#endif
}
catch (Exception e)
{
// Lost communications. Abort (but allow node reuse)
CommunicationsUtilities.Trace("Exception reading from server. {0}", e);
ExceptionHandling.DumpExceptionToFile(e);
ChangeLinkStatus(LinkStatus.Inactive);
exitLoop = true;
break;
}
if (bytesRead != headerByte.Length)
{
// Incomplete read. Abort.
if (bytesRead == 0)
{
CommunicationsUtilities.Trace("Parent disconnected abruptly");
}
else
{
CommunicationsUtilities.Trace("Incomplete header read from server. {0} of {1} bytes read", bytesRead, headerByte.Length);
}
ChangeLinkStatus(LinkStatus.Failed);
exitLoop = true;
break;
}
NodePacketType packetType = (NodePacketType)Enum.ToObject(typeof(NodePacketType), headerByte[0]);
try
{
_packetFactory.DeserializeAndRoutePacket(0, packetType, BinaryTranslator.GetReadTranslator(localReadPipe, _sharedReadBuffer));
}
catch (Exception e)
{
// Error while deserializing or handling packet. Abort.
CommunicationsUtilities.Trace("Exception while deserializing packet {0}: {1}", packetType, e);
ExceptionHandling.DumpExceptionToFile(e);
ChangeLinkStatus(LinkStatus.Failed);
exitLoop = true;
break;
}
#if FEATURE_APM
result = localReadPipe.BeginRead(headerByte, 0, headerByte.Length, null, null);
#else
readTask = CommunicationsUtilities.ReadAsync(localReadPipe, headerByte, headerByte.Length);
#endif
}
break;
case 1:
case 2:
try
{
// Write out all the queued packets.
INodePacket packet;
while (localPacketQueue.TryDequeue(out packet))
{
MemoryStream packetStream = new MemoryStream();
ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(packetStream);
packetStream.WriteByte((byte)packet.Type);
// Pad for packet length
packetStream.Write(BitConverter.GetBytes((int)0), 0, 4);
// Reset the position in the write buffer.
packet.Translate(writeTranslator);
// Now write in the actual packet length
packetStream.Position = 1;
packetStream.Write(BitConverter.GetBytes((int)packetStream.Length - 5), 0, 4);
localWritePipe.Write(packetStream.GetBuffer(), 0, (int)packetStream.Length);
}
}
catch (Exception e)
{
// Error while deserializing or handling packet. Abort.
CommunicationsUtilities.Trace("Exception while serializing packets: {0}", e);
ExceptionHandling.DumpExceptionToFile(e);
ChangeLinkStatus(LinkStatus.Failed);
exitLoop = true;
break;
}
if (waitId == 2)
{
CommunicationsUtilities.Trace("Disconnecting voluntarily");
ChangeLinkStatus(LinkStatus.Failed);
exitLoop = true;
}
break;
default:
ErrorUtilities.ThrowInternalError("waitId {0} out of range.", waitId);
break;
}
}
while (!exitLoop);
}
#endregion
#endregion
}
}