diff --git a/reader/src/test/java/org/jline/terminal/impl/AbstractWindowsTerminalTest.java b/reader/src/test/java/org/jline/terminal/impl/AbstractWindowsTerminalTest.java new file mode 100644 index 000000000..376b098ea --- /dev/null +++ b/reader/src/test/java/org/jline/terminal/impl/AbstractWindowsTerminalTest.java @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2002-2016, the original author or authors. + * + * This software is distributable under the BSD license. See the terms of the + * BSD license in the documentation provided with this software. + * + * https://opensource.org/licenses/BSD-3-Clause + */ +package org.jline.terminal.impl; + +import java.io.BufferedWriter; +import java.io.IOError; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.charset.Charset; + +import org.jline.reader.impl.LineReaderImpl; +import org.jline.terminal.Size; +import org.jline.utils.AnsiWriter; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class AbstractWindowsTerminalTest { + + @Test + public void testBracketingPasteSmall() throws Exception { + StringWriter sw = new StringWriter(); + TestTerminal terminal = new TestTerminal(sw); + String str = LineReaderImpl.BRACKETED_PASTE_BEGIN + "abcd"; + str.chars().forEachOrdered(c -> process(terminal, c)); + new Thread(() -> { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // ignore + } + LineReaderImpl.BRACKETED_PASTE_END.chars().forEachOrdered(c -> process(terminal, c)); + "\n".chars().forEachOrdered(c -> process(terminal, c)); + }).start(); + LineReaderImpl reader = new LineReaderImpl(terminal); + String res = reader.readLine(); + assertEquals("abcd", res); + } + + @Test + public void testBracketingPasteHuge() throws Exception { + StringWriter sw = new StringWriter(); + TestTerminal terminal = new TestTerminal(sw); + new Thread(() -> { + StringBuilder str = new StringBuilder(LineReaderImpl.BRACKETED_PASTE_BEGIN); + for (int i = 0; i < 100000; i++) { + str.append("0123456789"); + } + str.append(LineReaderImpl.BRACKETED_PASTE_END); + str.append("\n"); + str.toString().chars().forEachOrdered(c -> process(terminal, c)); + }).start(); + LineReaderImpl reader = new LineReaderImpl(terminal); + String res = reader.readLine(); + } + + private void process(TestTerminal terminal, int c) { + try { + terminal.processInputChar((char) c); + } catch (IOException e) { + throw new IOError(e); + } + } + + private static class TestTerminal extends AbstractWindowsTerminal { + public TestTerminal(StringWriter sw) throws IOException { + super(new AnsiWriter(new BufferedWriter(sw)), "name", + AbstractWindowsTerminal.TYPE_DUMB, + Charset.defaultCharset(), 0, + false, SignalHandler.SIG_DFL); + } + + @Override + protected int getConsoleOutputCP() { + return 0; + } + + @Override + protected int getConsoleMode() { + return 0; + } + + @Override + protected void setConsoleMode(int mode) { + } + + @Override + protected boolean processConsoleInput() throws IOException { + return false; + } + + @Override + public Size getSize() { + return new Size(10000000, 10000000); + } + } +} diff --git a/terminal/src/main/java/org/jline/utils/NonBlockingPumpReader.java b/terminal/src/main/java/org/jline/utils/NonBlockingPumpReader.java index d9e91750e..378b18ff4 100644 --- a/terminal/src/main/java/org/jline/utils/NonBlockingPumpReader.java +++ b/terminal/src/main/java/org/jline/utils/NonBlockingPumpReader.java @@ -11,15 +11,25 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.io.Writer; -import java.nio.CharBuffer; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; public class NonBlockingPumpReader extends NonBlockingReader { private static final int DEFAULT_BUFFER_SIZE = 4096; - // Read and write buffer are backed by the same array - private final CharBuffer readBuffer; - private final CharBuffer writeBuffer; + private final char[] buffer; + private int read; + private int write; + private int count; + + /** Main lock guarding all access */ + final ReentrantLock lock; + /** Condition for waiting takes */ + private final Condition notEmpty; + /** Condition for waiting puts */ + private final Condition notFull; private final Writer writer; @@ -30,80 +40,65 @@ public NonBlockingPumpReader() { } public NonBlockingPumpReader(int bufferSize) { - char[] buf = new char[bufferSize]; - this.readBuffer = CharBuffer.wrap(buf); - this.writeBuffer = CharBuffer.wrap(buf); + this.buffer = new char[bufferSize]; this.writer = new NbpWriter(); - // There are no bytes available to read after initialization - readBuffer.limit(0); + this.lock = new ReentrantLock(); + this.notEmpty = lock.newCondition(); + this.notFull = lock.newCondition(); } public Writer getWriter() { return this.writer; } - private int wait(CharBuffer buffer, long timeout) throws InterruptedIOException { - boolean isInfinite = (timeout <= 0L); - long end = 0; - if (!isInfinite) { - end = System.currentTimeMillis() + timeout; - } - while (!closed && !buffer.hasRemaining() && (isInfinite || timeout > 0L)) { - // Wake up waiting readers/writers - notifyAll(); - try { - wait(timeout); - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - if (!isInfinite) { - timeout = end - System.currentTimeMillis(); - } - } - return closed - ? EOF - : buffer.hasRemaining() - ? 0 - : READ_EXPIRED; - } - - private static boolean rewind(CharBuffer buffer, CharBuffer other) { - // Extend limit of other buffer if there is additional input/output available - if (buffer.position() > other.position()) { - other.limit(buffer.position()); - } - // If we have reached the end of the buffer, rewind and set the new limit - if (buffer.position() == buffer.capacity()) { - buffer.rewind(); - buffer.limit(other.position()); - return true; - } else { - return false; - } - } - @Override - public synchronized boolean ready() { - return readBuffer.hasRemaining(); + public boolean ready() { + return available() > 0; } - public synchronized int available() { - int count = readBuffer.remaining(); - if (writeBuffer.position() < readBuffer.position()) { - count += writeBuffer.position(); + public int available() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return count; + } finally { + lock.unlock(); } - return count; } @Override - protected synchronized int read(long timeout, boolean isPeek) throws IOException { - // Blocks until more input is available or the reader is closed. - int res = wait(readBuffer, timeout); - if (res >= 0) { - res = isPeek ? readBuffer.get(readBuffer.position()) : readBuffer.get(); + protected int read(long timeout, boolean isPeek) throws IOException { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + // Blocks until more input is available or the reader is closed. + if (!closed && count == 0) { + try { + notEmpty.await(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException().initCause(e); + } + } + if (closed) { + return EOF; + } else if (count == 0) { + return READ_EXPIRED; + } else { + if (isPeek) { + return buffer[read]; + } else { + int res = buffer[read]; + if (++read == buffer.length) { + read = 0; + } + --count; + notFull.signal(); + return res; + } + } + } finally { + lock.unlock(); } - rewind(readBuffer, writeBuffer); - return res; } @Override @@ -113,50 +108,83 @@ public int readBuffered(char[] b) throws IOException { } else if (b.length == 0) { return 0; } else { - int r = Math.min(b.length, readBuffer.remaining()); - if (r > 0) { - readBuffer.get(b); - return r; - } else { - r = read(-1, false); - if (r >= 0) { - b[0] = (char) r; - return 1; + final ReentrantLock lock = this.lock; + lock.lock(); + try { + if (!closed && count == 0) { + try { + notEmpty.await(); + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException().initCause(e); + } + } + if (closed) { + return EOF; + } else if (count == 0) { + return READ_EXPIRED; + } else { + int r = Math.min(b.length, count); + for (int i = 0; i < r; i++) { + b[i] = buffer[read++]; + if (read == buffer.length) { + read = 0; + } + } + count -= r; + notFull.signal(); + return r; } - return r; + } finally { + lock.unlock(); } } } - synchronized void write(char[] cbuf, int off, int len) throws IOException { - while (len > 0) { - // Blocks until there is new space available for buffering or the - // reader is closed. - if (wait(writeBuffer, 0L) == EOF) { - throw new ClosedException(); + void write(char[] cbuf, int off, int len) throws IOException { + if (len > 0) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + while (len > 0) { + // Blocks until there is new space available for buffering or the + // reader is closed. + if (!closed && count == buffer.length) { + try { + notFull.await(); + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException().initCause(e); + } + } + if (closed) { + throw new IOException("Closed"); + } + while (len > 0 && count < buffer.length) { + buffer[write++] = cbuf[off++]; + count++; + len--; + if (write == buffer.length) { + write = 0; + } + } + notEmpty.signal(); + } + } finally { + lock.unlock(); } - // Copy as much characters as we can - int count = Math.min(len, writeBuffer.remaining()); - writeBuffer.put(cbuf, off, count); - off += count; - len -= count; - // Update buffer states and rewind if necessary - rewind(writeBuffer, readBuffer); - } - } - - synchronized void flush() { - // Avoid waking up readers when there is nothing to read - if (readBuffer.hasRemaining()) { - // Notify readers - notifyAll(); } } @Override - public synchronized void close() throws IOException { - this.closed = true; - notifyAll(); + public void close() throws IOException { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + this.closed = true; + this.notEmpty.signalAll(); + this.notFull.signalAll(); + } finally { + lock.unlock(); + } } private class NbpWriter extends Writer { @@ -168,7 +196,6 @@ public void write(char[] cbuf, int off, int len) throws IOException { @Override public void flush() throws IOException { - NonBlockingPumpReader.this.flush(); } @Override