Skip to content

Commit

Permalink
[MSHARED-617] StreamFeeder should flush OutputStream
Browse files Browse the repository at this point in the history
[MSHARED-619] StreamFeeder silently ignores exceptions.
[MSHARED-620] CommandLineCallable should defer starting threads until called.
[MSHARED-621] CommandLineCallable should calculate process timeouts using 'System.nanoTime' instead of 'System.currentTimeMillis'.
[MSHARED-622] CommandLineCallable silently ignores exceptions thrown from the stdin processor (StreemFeeder).

o CommandLineCallable silently ignores exception thrown from the stdout/stderr processors (StreamPumper).
o Bumped to next minor version due to addition of new public method in 'StreamFeeder'.
  • Loading branch information
ChristianSchulte committed Feb 26, 2017
1 parent 0cd0352 commit 21cedc9
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 104 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ limitations under the License.
</parent>

<artifactId>plexus-utils</artifactId>
<version>3.0.25-SNAPSHOT</version>
<version>3.1.0-SNAPSHOT</version>

<name>Plexus Common Utilities</name>
<description>A collection of various utility classes to ease working with strings, files, command lines, XML and
Expand Down
243 changes: 176 additions & 67 deletions src/main/java/org/codehaus/plexus/util/cli/CommandLineUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.StringTokenizer;
import java.util.Vector;

import org.codehaus.plexus.util.Os;
import org.codehaus.plexus.util.StringUtils;

Expand All @@ -33,9 +33,16 @@
*/
public abstract class CommandLineUtils
{

/**
* A {@code StreamConsumer} providing consumed lines as a {@code String}.
*
* @see #getOutput()
*/
public static class StringStreamConsumer
implements StreamConsumer
{

private StringBuffer string = new StringBuffer();

private String ls = System.getProperty( "line.separator" );
Expand All @@ -49,24 +56,18 @@ public String getOutput()
{
return string.toString();
}
}

private static class ProcessHook extends Thread {
private final Process process;

private ProcessHook( Process process )
{
super("CommandlineUtils process shutdown hook");
this.process = process;
this.setContextClassLoader( null );
}

public void run()
{
process.destroy();
}
}

/**
* Number of milliseconds per second.
*/
private static final long MILLIS_PER_SECOND = 1000L;

/**
* Number of nanoseconds per second.
*/
private static final long NANOS_PER_SECOND = 1000000000L;

public static int executeCommandLine( Commandline cl, StreamConsumer systemOut, StreamConsumer systemErr )
throws CommandLineException
Expand Down Expand Up @@ -120,10 +121,11 @@ public static int executeCommandLine( Commandline cl, InputStream systemIn, Stre
* @throws CommandLineException or CommandLineTimeOutException if time out occurs
* @noinspection ThrowableResultOfMethodCallIgnored
*/
public static CommandLineCallable executeCommandLineAsCallable( final Commandline cl, final InputStream systemIn,
final StreamConsumer systemOut,
final StreamConsumer systemErr,
final int timeoutInSeconds )
public static CommandLineCallable executeCommandLineAsCallable( final Commandline cl,
final InputStream systemIn,
final StreamConsumer systemOut,
final StreamConsumer systemErr,
final int timeoutInSeconds )
throws CommandLineException
{
if ( cl == null )
Expand All @@ -133,108 +135,215 @@ public static CommandLineCallable executeCommandLineAsCallable( final Commandlin

final Process p = cl.execute();

final StreamFeeder inputFeeder = systemIn != null ?
new StreamFeeder( systemIn, p.getOutputStream() ) : null;

final StreamPumper outputPumper = new StreamPumper( p.getInputStream(), systemOut );

final StreamPumper errorPumper = new StreamPumper( p.getErrorStream(), systemErr );

if ( inputFeeder != null )
final Thread processHook = new Thread()
{
inputFeeder.start();
}

outputPumper.start();
{
this.setName( "CommandLineUtils process shutdown hook" );
this.setContextClassLoader( null );
}

errorPumper.start();
@Override
public void run()
{
p.destroy();
}

final ProcessHook processHook = new ProcessHook( p );
};

ShutdownHookUtils.addShutDownHook( processHook );

return new CommandLineCallable()
{

public Integer call()
throws CommandLineException
{
StreamFeeder inputFeeder = null;
StreamPumper outputPumper = null;
StreamPumper errorPumper = null;
boolean success = false;
try
{
if ( systemIn != null )
{
inputFeeder = new StreamFeeder( systemIn, p.getOutputStream() );
inputFeeder.start();
}

outputPumper = new StreamPumper( p.getInputStream(), systemOut );
outputPumper.start();

errorPumper = new StreamPumper( p.getErrorStream(), systemErr );
errorPumper.start();

int returnValue;
if ( timeoutInSeconds <= 0 )
{
returnValue = p.waitFor();
}
else
{
long now = System.currentTimeMillis();
long timeoutInMillis = 1000L * timeoutInSeconds;
long finish = now + timeoutInMillis;
while ( isAlive( p ) && ( System.currentTimeMillis() < finish ) )
final long now = System.nanoTime();
final long timeout = now + NANOS_PER_SECOND * timeoutInSeconds;

while ( isAlive( p ) && ( System.nanoTime() < timeout ) )
{
Thread.sleep( 10 );
// The timeout is specified in seconds. Therefore we must not sleep longer than one second
// but we should sleep as long as possible to reduce the number of iterations performed.
Thread.sleep( MILLIS_PER_SECOND - 1L );
}

if ( isAlive( p ) )
{
throw new InterruptedException( "Process timeout out after " + timeoutInSeconds + " seconds" );
throw new InterruptedException( String.format( "Process timed out after %d seconds.",
timeoutInSeconds ) );
}

returnValue = p.exitValue();
}

waitForAllPumpers( inputFeeder, outputPumper, errorPumper );

if ( outputPumper.getException() != null )
// TODO Find out if waitUntilDone needs to be called using a try-finally construct. The method may throw an
// InterruptedException so that calls to waitUntilDone may be skipped.
// try
// {
// if ( inputFeeder != null )
// {
// inputFeeder.waitUntilDone();
// }
// }
// finally
// {
// try
// {
// outputPumper.waitUntilDone();
// }
// finally
// {
// errorPumper.waitUntilDone();
// }
// }
if ( inputFeeder != null )
{
throw new CommandLineException( "Error inside systemOut parser", outputPumper.getException() );
inputFeeder.waitUntilDone();
}

if ( errorPumper.getException() != null )
outputPumper.waitUntilDone();
errorPumper.waitUntilDone();

if ( inputFeeder != null )
{
throw new CommandLineException( "Error inside systemErr parser", errorPumper.getException() );
inputFeeder.close();
handleException( inputFeeder, "stdin" );
}

outputPumper.close();
handleException( outputPumper, "stdout" );

errorPumper.close();
handleException( errorPumper, "stderr" );

success = true;
return returnValue;
}
catch ( InterruptedException ex )
{
if ( inputFeeder != null )
{
inputFeeder.disable();
}
outputPumper.disable();
errorPumper.disable();
throw new CommandLineTimeOutException( "Error while executing external command, process killed.", ex );
throw new CommandLineTimeOutException( "Error while executing external command, process killed.",
ex );

}
finally
{
ShutdownHookUtils.removeShutdownHook( processHook );

processHook.run();

if ( inputFeeder != null )
{
inputFeeder.close();
inputFeeder.disable();
}
if ( outputPumper != null )
{
outputPumper.disable();
}
if ( errorPumper != null )
{
errorPumper.disable();
}

outputPumper.close();

errorPumper.close();
try
{
ShutdownHookUtils.removeShutdownHook( processHook );
processHook.run();
}
finally
{
try
{
if ( inputFeeder != null )
{
inputFeeder.close();

if ( success )
{
success = false;
handleException( inputFeeder, "stdin" );
success = true; // Only reached when no exception has been thrown.
}
}
}
finally
{
try
{
if ( outputPumper != null )
{
outputPumper.close();

if ( success )
{
success = false;
handleException( outputPumper, "stdout" );
success = true; // Only reached when no exception has been thrown.
}
}
}
finally
{
if ( errorPumper != null )
{
errorPumper.close();

if ( success )
{
handleException( errorPumper, "stderr" );
}
}
}
}
}
}
}

};
}

private static void waitForAllPumpers( StreamFeeder inputFeeder, StreamPumper outputPumper,
StreamPumper errorPumper )
throws InterruptedException
private static void handleException( final StreamPumper streamPumper, final String streamName )
throws CommandLineException
{
if ( inputFeeder != null )
if ( streamPumper.getException() != null )
{
inputFeeder.waitUntilDone();
throw new CommandLineException( String.format( "Failure processing %s.", streamName ),
streamPumper.getException() );

}
}

outputPumper.waitUntilDone();
errorPumper.waitUntilDone();
private static void handleException( final StreamFeeder streamFeeder, final String streamName )
throws CommandLineException
{
if ( streamFeeder.getException() != null )
{
throw new CommandLineException( String.format( "Failure processing %s.", streamName ),
streamFeeder.getException() );

}
}

/**
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/org/codehaus/plexus/util/cli/DefaultConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,24 @@
* limitations under the License.
*/

import java.io.IOException;

/**
* @author <a href="mailto:evenisse@apache.org">Emmanuel Venisse</a>
* @version $Id$
*/
public class DefaultConsumer
implements StreamConsumer
{
public void consumeLine( String line )

public void consumeLine( String line ) throws IOException
{
System.out.println( line );

if ( System.out.checkError() )
{
throw new IOException( String.format( "Failure printing line '%s' to stdout.", line ) );
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
********************************************************************************/

import java.io.IOException;

/**
* Works in concert with the StreamPumper class to
* allow implementations to gain access to the lines being
Expand All @@ -69,6 +71,8 @@ public interface StreamConsumer
{
/**
* Called when the StreamPumper pumps a line from the Stream.
* @param line The line to be consumed.
* @throws IOException if consuming {@code line} fails.
*/
public void consumeLine( String line );
public void consumeLine( String line ) throws IOException;
}

0 comments on commit 21cedc9

Please sign in to comment.