From bdbed965a92ee6e3bc31cbfa4ee98285b72c45c5 Mon Sep 17 00:00:00 2001 From: Thomas Wolf Date: Wed, 2 Nov 2022 11:02:38 +0100 Subject: [PATCH] [GH-263] Fix race condition in BufferedIoOutputStream Don't try to write a future that's already done in startWrite(). Just skip it and try the next one, if any. Fixes #263. Bug: https://github.com/apache/mina-sshd/issues/263 --- .../channel/BufferedIoOutputStream.java | 62 ++++++++++++------- .../channel/ChannelAsyncOutputStream.java | 4 +- 2 files changed, 43 insertions(+), 23 deletions(-) diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java index 1e3a23c35..d024f3b04 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java @@ -152,38 +152,56 @@ protected void waitForAvailableWriteSpace(int requiredSize) throws IOException { } } - protected void startWriting() throws IOException { - IoWriteFutureImpl future = writes.peek(); - // No more pending requests - if (future == null) { - return; - } + private IoWriteFutureImpl getWriteRequest() { + IoWriteFutureImpl future = null; + while (future == null) { + future = writes.peek(); + // No more pending requests + if (future == null) { + return null; + } + + // Don't try to write any further if pending exception signaled + Throwable pendingError = pendingException.get(); + if (pendingError != null) { + log.error("startWriting({})[{}] propagate to {} write requests pending error={}[{}]", getId(), out, + writes.size(), getClass().getSimpleName(), pendingError.getMessage()); - // Don't try to write any further if pending exception signaled - Throwable pendingError = pendingException.get(); - if (pendingError != null) { - log.error("startWriting({})[{}] propagate to {} write requests pending error={}[{}]", - getId(), out, writes.size(), getClass().getSimpleName(), pendingError.getMessage()); + IoWriteFutureImpl currentFuture = currentWrite.getAndSet(null); + for (IoWriteFutureImpl pendingWrite : writes) { + // Checking reference by design + if (UnaryEquator.isSameReference(pendingWrite, currentFuture)) { + continue; // will be taken care of when its listener is eventually called + } - IoWriteFutureImpl currentFuture = currentWrite.getAndSet(null); - for (IoWriteFutureImpl pendingWrite : writes) { - // Checking reference by design - if (UnaryEquator.isSameReference(pendingWrite, currentFuture)) { - continue; // will be taken care of when its listener is eventually called + future.setValue(pendingError); } - future.setValue(pendingError); + writes.clear(); + return null; } - writes.clear(); - return; + // Cannot honor this request yet since other pending one incomplete + if (!currentWrite.compareAndSet(null, future)) { + return null; + } + + if (future.isDone()) { + // A write was on-going, and finishWrite hadn't removed the future yet when we got it + // above. See https://github.com/apache/mina-sshd/issues/263 . + // Re-try. + currentWrite.set(null); + future = null; + } } + return future; + } - // Cannot honor this request yet since other pending one incomplete - if (!currentWrite.compareAndSet(null, future)) { + protected void startWriting() throws IOException { + IoWriteFutureImpl future = getWriteRequest(); + if (future == null) { return; } - Buffer buffer = future.getBuffer(); int bufferSize = buffer.available(); out.writeBuffer(buffer).addListener(f -> { diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java index e9ac8dfb3..48fef0918 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java @@ -294,7 +294,9 @@ protected IoWriteFutureImpl writePacket(IoWriteFutureImpl future, boolean resume if (chunkLength < stillToSend && !(f instanceof BufferedFuture)) { // We can send only part of the data remaining: copy the buffer (if it hasn't been copied before) because // the original may be re-used, then send the bit we can send, and queue up a future for sending the rest. - f = new BufferedFuture(future.getId(), new ByteArrayBuffer(buffer.getCompactData())); + Buffer copied = new ByteArrayBuffer(stillToSend); + copied.putBuffer(buffer, false); + f = new BufferedFuture(future.getId(), copied); f.addListener(w -> future.setValue(w.getException() != null ? w.getException() : w.isWritten())); } if (chunkLength <= 0) {