Skip to content

Commit

Permalink
Fix pipeline regressions
Browse files Browse the repository at this point in the history
  • Loading branch information
daxian-dbw committed Apr 8, 2021
1 parent 19abcbe commit 3ba0c31
Showing 1 changed file with 80 additions and 55 deletions.
135 changes: 80 additions & 55 deletions src/System.Management.Automation/engine/pipeline.cs
Expand Up @@ -538,11 +538,8 @@ internal Array SynchronousExecuteEnumerate(object input)
// the call to 'pipelineProcessor.Step'.
// It's possible (though very unlikely) that the call to 'pipelineProcessor.Step' failed with an
// exception, and in such case, the 'pipelineProcessor' would have been disposed, and therefore
// we should skip the 'DoComplete' call on it.
if (!redirectPipelineProcessor._stopping)
{
redirectPipelineProcessor.DoCompleteCore(null);
}
// the call to 'DoComplete' will simply return, because '_commands' was already set to null.
redirectPipelineProcessor.DoCompleteCore(null);
}
}

Expand Down Expand Up @@ -579,10 +576,33 @@ private ExceptionDispatchInfo GetFirstError(RuntimeException e)
return firstError;
}

private void ThrowFirstErrorIfExisting(bool logException)
{
// The main purpose of this method is to check if the pipeline is being stopped asynchronously,
// by 'Ctrl+c' manually or 'PowerShell.Stop' programatically.
// When the pipeline is being stopped, a 'PipelineStoppedException' object will be assigned to
// '_firstTerminatingError' (see 'PipelineProcessor.Stop' for details), and thus the pipeline
// execution needs to check if '_firstTerminatingError' has been set in various places, so as
// to throw out this 'PipelineStoppedException' to terminate the pipeline execution.
if (_firstTerminatingError != null)
{
if (logException)
{
LogExecutionException(_firstTerminatingError.SourceException);
}

_firstTerminatingError.Throw();
}
}

private void DoCompleteCore(CommandProcessorBase commandRequestingUpstreamCommandsToStop)
{
if (_commands is null)
{
// This could happen to a redirection pipeline, either for an expression (e.g. 1 > a.txt)
// or for a command (e.g. command > a.txt).
// An exception may be thrown from the call to 'StartStepping' or 'Step' on the pipeline,
// which causes the pipeline commands to be disposed.
return;
}

Expand Down Expand Up @@ -658,11 +678,18 @@ private void DoCompleteCore(CommandProcessorBase commandRequestingUpstreamComman
}

// If a terminating error occurred, report it now.
if (_firstTerminatingError != null)
{
this.LogExecutionException(_firstTerminatingError.SourceException);
_firstTerminatingError.Throw();
}
// This pipeline could have been stopped asynchronously and we need to check and
// see if that's the case.
// An example:
// - 'Start-Sleep' is running in this pipeline, and 'pipelineProcessor.Stop' gets
// called on a different thread, which sets a 'PipelineStoppedException' object
// to '_firstTerminatingError' and runs 'StopProcessing' on 'Start-Sleep'.
// The 'StopProcessing' will cause 'Start-Sleep' to return from 'ProcessRecord'
// call, and thus the pipeline execution will move forward to run 'DoComplete'
// for the 'Start-Sleep' command and thus the code flow will reach here.
// For this given example, we need to check '_firstTerminatingError' and throw out
// the 'PipelineStoppedException' if the pipeline was indeed being stopped.
ThrowFirstErrorIfExisting(logException: true);
}

/// <summary>
Expand Down Expand Up @@ -731,42 +758,44 @@ private void Clean()
/// <returns>The results of the execution.</returns>
internal Array DoComplete()
{
if (Stopping)
try
{
throw new PipelineStoppedException();
}
if (Stopping)
{
throw new PipelineStoppedException();
}

if (!_executionStarted)
{
throw PSTraceSource.NewInvalidOperationException(
PipelineStrings.PipelineNotStarted);
}
if (!_executionStarted)
{
throw PSTraceSource.NewInvalidOperationException(
PipelineStrings.PipelineNotStarted);
}

ExceptionDispatchInfo toRethrowInfo;
try
{
DoCompleteCore(null);
ExceptionDispatchInfo toRethrowInfo;
try
{
DoCompleteCore(null);
return RetrieveResults();
}
catch (RuntimeException e)
{
toRethrowInfo = GetFirstError(e);
}

return RetrieveResults();
}
catch (RuntimeException e)
{
toRethrowInfo = GetFirstError(e);
// By rethrowing the exception outside of the handler, we allow the CLR on X64/IA64 to free from the stack
// the exception records related to this exception.

// The only reason we should get here is if an exception should be rethrown.
Diagnostics.Assert(toRethrowInfo != null, "Alternate protocol path failure");
toRethrowInfo.Throw();

// UNREACHABLE
return null;
}
finally
{
DisposeCommands();
}

// By rethrowing the exception outside of the handler,
// we allow the CLR on X64/IA64 to free from the stack
// the exception records related to this exception.

// The only reason we should get here is if
// an exception should be rethrown.
Diagnostics.Assert(toRethrowInfo != null, "Alternate protocol path failure");
toRethrowInfo.Throw();
return null; // UNREACHABLE
}

/// <summary>
Expand All @@ -784,10 +813,7 @@ internal void StartStepping(bool expectInput)
Start(expectInput);

// If a terminating error occurred, report it now.
if (_firstTerminatingError != null)
{
_firstTerminatingError.Throw();
}
ThrowFirstErrorIfExisting(logException: false);
}
catch (PipelineStoppedException)
{
Expand All @@ -796,10 +822,7 @@ internal void StartStepping(bool expectInput)
// The error we want to report is the first terminating error
// which occurred during pipeline execution, regardless
// of whether other errors occurred afterward.
if (_firstTerminatingError != null)
{
_firstTerminatingError.Throw();
}
ThrowFirstErrorIfExisting(logException: false);

throw;
}
Expand Down Expand Up @@ -895,10 +918,7 @@ internal Array Step(object input)
Inject(input, enumerate: false);

// If a terminating error occurred, report it now.
if (_firstTerminatingError != null)
{
_firstTerminatingError.Throw();
}
ThrowFirstErrorIfExisting(logException: false);

return RetrieveResults();
}
Expand All @@ -909,10 +929,7 @@ internal Array Step(object input)
// The error we want to report is the first terminating error
// which occurred during pipeline execution, regardless
// of whether other errors occurred afterward.
if (_firstTerminatingError != null)
{
_firstTerminatingError.Throw();
}
ThrowFirstErrorIfExisting(logException: false);

throw;
}
Expand Down Expand Up @@ -1192,6 +1209,14 @@ private void Inject(object input, bool enumerate)
/// </returns>
private Array RetrieveResults()
{
if (_commands is null)
{
// This could happen to an expression redirection pipeline (e.g. 1 > a.txt).
// An exception may be thrown from the call to 'StartStepping' or 'Step' on the pipeline,
// which causes the pipeline commands to be disposed.
return MshCommandRuntime.StaticEmptyArray;
}

// If the error queue has been linked, it's up to the link to
// deal with the output. Don't do anything here...
if (!_linkedErrorOutput)
Expand All @@ -1213,17 +1238,17 @@ private Array RetrieveResults()
// If the success queue has been linked, it's up to the link to
// deal with the output. Don't do anything here...
if (_linkedSuccessOutput)
{
return MshCommandRuntime.StaticEmptyArray;
}

CommandProcessorBase LastCommandProcessor = _commands[_commands.Count - 1];
ValidateCommandProcessorNotNull(LastCommandProcessor, errorMessage: null);

Array results = LastCommandProcessor.CommandRuntime.GetResultsAsArray();

// 2003/10/02-JonN
// Do not return the same results more than once
LastCommandProcessor.CommandRuntime.OutputPipe.Clear();

return results is null ? MshCommandRuntime.StaticEmptyArray : results;
}

Expand Down

0 comments on commit 3ba0c31

Please sign in to comment.