Skip to content

Commit

Permalink
Rewrite NonBlockingPumpReader, fixes #493
Browse files Browse the repository at this point in the history
  • Loading branch information
gnodet committed Jan 7, 2020
1 parent 37ef992 commit c50fe7a
Show file tree
Hide file tree
Showing 2 changed files with 228 additions and 98 deletions.
@@ -0,0 +1,103 @@
/*
* Copyright (c) 2002-2016, the original author or authors.
*
* This software is distributable under the BSD license. See the terms of the
* BSD license in the documentation provided with this software.
*
* https://opensource.org/licenses/BSD-3-Clause
*/
package org.jline.terminal.impl;

import java.io.BufferedWriter;
import java.io.IOError;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.charset.Charset;

import org.jline.reader.impl.LineReaderImpl;
import org.jline.terminal.Size;
import org.jline.utils.AnsiWriter;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class AbstractWindowsTerminalTest {

@Test
public void testBracketingPasteSmall() throws Exception {
StringWriter sw = new StringWriter();
TestTerminal terminal = new TestTerminal(sw);
String str = LineReaderImpl.BRACKETED_PASTE_BEGIN + "abcd";
str.chars().forEachOrdered(c -> process(terminal, c));
new Thread(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// ignore
}
LineReaderImpl.BRACKETED_PASTE_END.chars().forEachOrdered(c -> process(terminal, c));
"\n".chars().forEachOrdered(c -> process(terminal, c));
}).start();
LineReaderImpl reader = new LineReaderImpl(terminal);
String res = reader.readLine();
assertEquals("abcd", res);
}

@Test
public void testBracketingPasteHuge() throws Exception {
StringWriter sw = new StringWriter();
TestTerminal terminal = new TestTerminal(sw);
new Thread(() -> {
StringBuilder str = new StringBuilder(LineReaderImpl.BRACKETED_PASTE_BEGIN);
for (int i = 0; i < 100000; i++) {
str.append("0123456789");
}
str.append(LineReaderImpl.BRACKETED_PASTE_END);
str.append("\n");
str.toString().chars().forEachOrdered(c -> process(terminal, c));
}).start();
LineReaderImpl reader = new LineReaderImpl(terminal);
String res = reader.readLine();
}

private void process(TestTerminal terminal, int c) {
try {
terminal.processInputChar((char) c);
} catch (IOException e) {
throw new IOError(e);
}
}

private static class TestTerminal extends AbstractWindowsTerminal {
public TestTerminal(StringWriter sw) throws IOException {
super(new AnsiWriter(new BufferedWriter(sw)), "name",
AbstractWindowsTerminal.TYPE_DUMB,
Charset.defaultCharset(), 0,
false, SignalHandler.SIG_DFL);
}

@Override
protected int getConsoleOutputCP() {
return 0;
}

@Override
protected int getConsoleMode() {
return 0;
}

@Override
protected void setConsoleMode(int mode) {
}

@Override
protected boolean processConsoleInput() throws IOException {
return false;
}

@Override
public Size getSize() {
return new Size(10000000, 10000000);
}
}
}
223 changes: 125 additions & 98 deletions terminal/src/main/java/org/jline/utils/NonBlockingPumpReader.java
Expand Up @@ -11,15 +11,25 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.Writer;
import java.nio.CharBuffer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class NonBlockingPumpReader extends NonBlockingReader {

private static final int DEFAULT_BUFFER_SIZE = 4096;

// Read and write buffer are backed by the same array
private final CharBuffer readBuffer;
private final CharBuffer writeBuffer;
private final char[] buffer;
private int read;
private int write;
private int count;

/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;

private final Writer writer;

Expand All @@ -30,80 +40,65 @@ public NonBlockingPumpReader() {
}

public NonBlockingPumpReader(int bufferSize) {
char[] buf = new char[bufferSize];
this.readBuffer = CharBuffer.wrap(buf);
this.writeBuffer = CharBuffer.wrap(buf);
this.buffer = new char[bufferSize];
this.writer = new NbpWriter();
// There are no bytes available to read after initialization
readBuffer.limit(0);
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.notFull = lock.newCondition();
}

public Writer getWriter() {
return this.writer;
}

private int wait(CharBuffer buffer, long timeout) throws InterruptedIOException {
boolean isInfinite = (timeout <= 0L);
long end = 0;
if (!isInfinite) {
end = System.currentTimeMillis() + timeout;
}
while (!closed && !buffer.hasRemaining() && (isInfinite || timeout > 0L)) {
// Wake up waiting readers/writers
notifyAll();
try {
wait(timeout);
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
if (!isInfinite) {
timeout = end - System.currentTimeMillis();
}
}
return closed
? EOF
: buffer.hasRemaining()
? 0
: READ_EXPIRED;
}

private static boolean rewind(CharBuffer buffer, CharBuffer other) {
// Extend limit of other buffer if there is additional input/output available
if (buffer.position() > other.position()) {
other.limit(buffer.position());
}
// If we have reached the end of the buffer, rewind and set the new limit
if (buffer.position() == buffer.capacity()) {
buffer.rewind();
buffer.limit(other.position());
return true;
} else {
return false;
}
}

@Override
public synchronized boolean ready() {
return readBuffer.hasRemaining();
public boolean ready() {
return available() > 0;
}

public synchronized int available() {
int count = readBuffer.remaining();
if (writeBuffer.position() < readBuffer.position()) {
count += writeBuffer.position();
public int available() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
return count;
}

@Override
protected synchronized int read(long timeout, boolean isPeek) throws IOException {
// Blocks until more input is available or the reader is closed.
int res = wait(readBuffer, timeout);
if (res >= 0) {
res = isPeek ? readBuffer.get(readBuffer.position()) : readBuffer.get();
protected int read(long timeout, boolean isPeek) throws IOException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// Blocks until more input is available or the reader is closed.
if (!closed && count == 0) {
try {
notEmpty.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException().initCause(e);
}
}
if (closed) {
return EOF;
} else if (count == 0) {
return READ_EXPIRED;
} else {
if (isPeek) {
return buffer[read];
} else {
int res = buffer[read];
if (++read == buffer.length) {
read = 0;
}
--count;
notFull.signal();
return res;
}
}
} finally {
lock.unlock();
}
rewind(readBuffer, writeBuffer);
return res;
}

@Override
Expand All @@ -113,50 +108,83 @@ public int readBuffered(char[] b) throws IOException {
} else if (b.length == 0) {
return 0;
} else {
int r = Math.min(b.length, readBuffer.remaining());
if (r > 0) {
readBuffer.get(b);
return r;
} else {
r = read(-1, false);
if (r >= 0) {
b[0] = (char) r;
return 1;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (!closed && count == 0) {
try {
notEmpty.await();
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException().initCause(e);
}
}
if (closed) {
return EOF;
} else if (count == 0) {
return READ_EXPIRED;
} else {
int r = Math.min(b.length, count);
for (int i = 0; i < r; i++) {
b[i] = buffer[read++];
if (read == buffer.length) {
read = 0;
}
}
count -= r;
notFull.signal();
return r;
}
return r;
} finally {
lock.unlock();
}
}
}

synchronized void write(char[] cbuf, int off, int len) throws IOException {
while (len > 0) {
// Blocks until there is new space available for buffering or the
// reader is closed.
if (wait(writeBuffer, 0L) == EOF) {
throw new ClosedException();
void write(char[] cbuf, int off, int len) throws IOException {
if (len > 0) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (len > 0) {
// Blocks until there is new space available for buffering or the
// reader is closed.
if (!closed && count == buffer.length) {
try {
notFull.await();
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException().initCause(e);
}
}
if (closed) {
throw new IOException("Closed");
}
while (len > 0 && count < buffer.length) {
buffer[write++] = cbuf[off++];
count++;
len--;
if (write == buffer.length) {
write = 0;
}
}
notEmpty.signal();
}
} finally {
lock.unlock();
}
// Copy as much characters as we can
int count = Math.min(len, writeBuffer.remaining());
writeBuffer.put(cbuf, off, count);
off += count;
len -= count;
// Update buffer states and rewind if necessary
rewind(writeBuffer, readBuffer);
}
}

synchronized void flush() {
// Avoid waking up readers when there is nothing to read
if (readBuffer.hasRemaining()) {
// Notify readers
notifyAll();
}
}

@Override
public synchronized void close() throws IOException {
this.closed = true;
notifyAll();
public void close() throws IOException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
this.closed = true;
this.notEmpty.signalAll();
this.notFull.signalAll();
} finally {
lock.unlock();
}
}

private class NbpWriter extends Writer {
Expand All @@ -168,7 +196,6 @@ public void write(char[] cbuf, int off, int len) throws IOException {

@Override
public void flush() throws IOException {
NonBlockingPumpReader.this.flush();
}

@Override
Expand Down

0 comments on commit c50fe7a

Please sign in to comment.