Skip to content

Commit

Permalink
Merge pull request #1259 from baranowb/UNDERTOW-1980
Browse files Browse the repository at this point in the history
[UNDERTOW-1980] add local vars and sync to handle read source in Abst…
  • Loading branch information
fl4via committed Nov 19, 2021
2 parents abbe58e + 9941c88 commit ab59c62
Showing 1 changed file with 24 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,13 @@ public InetSocketAddress getDestinationAddress() {
* of calling this method then it can prevent frame channels for being fully consumed.
*/
public synchronized R receive() throws IOException {
ReferenceCountedPooled pooled = this.readData;
if (readChannelDone && receiver == null) {
//we have received the last frame, we just shut down and return
//it would probably make more sense to have the last channel responsible for this
//however it is much simpler just to have it here
if(readData != null) {
readData.close();
if(pooled != null) {
pooled.close();
readData = null;
}
channel.getSourceChannel().suspendReads();
Expand All @@ -358,7 +359,6 @@ public synchronized R receive() throws IOException {
partialRead = false;
boolean requiresReinvoke = false;
int reinvokeDataRemaining = 0;
ReferenceCountedPooled pooled = this.readData;
boolean hasData = false;
if (pooled == null) {
pooled = allocateReferenceCountedBuffer();
Expand Down Expand Up @@ -501,9 +501,9 @@ public synchronized R receive() throws IOException {
}
}
if(requiresReinvoke) {
if(readData != null && !readData.isFreed()) {
if(readData.getBuffer().remaining() == reinvokeDataRemaining) {
readData.close();
if(pooled != null && !pooled.isFreed()) {
if(pooled.getBuffer().remaining() == reinvokeDataRemaining) {
pooled.close();
readData = null;
UndertowLogger.REQUEST_IO_LOGGER.debugf("Partial message read before connection close %s", this);
}
Expand Down Expand Up @@ -798,7 +798,9 @@ public synchronized void resumeReceives() {
}

private void doResume() {
if (readData != null && !readData.isFreed()) {
//NOTE: this should not require syncing with below part
final ReferenceCountedPooled localReadData = this.readData;
if (localReadData != null && !localReadData.isFreed()) {
channel.getSourceChannel().wakeupReads();
} else {
channel.getSourceChannel().resumeReads();
Expand All @@ -813,14 +815,15 @@ public boolean isReceivesResumed() {
* Forcibly closes the {@link io.undertow.server.protocol.framed.AbstractFramedChannel}.
*/
@Override
public void close() throws IOException {
public synchronized void close() throws IOException {
if (UndertowLogger.REQUEST_IO_LOGGER.isTraceEnabled()) {
UndertowLogger.REQUEST_IO_LOGGER.tracef(new ClosedChannelException(), "Channel %s is being closed", this);
}
safeClose(channel);
if (readData != null) {
readData.close();
readData = null;
final ReferenceCountedPooled localReadData = this.readData;
if (localReadData != null) {
localReadData.close();
this.readData = null;
}
closeSubChannels();
}
Expand Down Expand Up @@ -955,7 +958,8 @@ public void handleEvent(final StreamSourceChannel channel) {
synchronized (AbstractFramedChannel.this) {
partialRead = AbstractFramedChannel.this.partialRead;
}
if (readData != null && !readData.isFreed() && channel.isOpen() && !partialRead) {
final ReferenceCountedPooled localReadData = readData;
if (localReadData != null && !localReadData.isFreed() && channel.isOpen() && !partialRead) {
try {
runInIoThread(new Runnable() {
@Override
Expand Down Expand Up @@ -1010,15 +1014,17 @@ public void run() {
} else if(c instanceof StreamSourceChannel) {
sourceClosed = true;
}

final ReferenceCountedPooled localReadData = readData;
if(!sourceClosed || !sinkClosed) {
return; //both sides need to be closed
} else if(readData != null && !readData.isFreed()) {
} else if(localReadData != null && !localReadData.isFreed()) {
//we make sure there is no data left to receive, if there is then we invoke the receive listener
runInIoThread(new Runnable() {
@Override
public void run() {
while (readData != null && !readData.isFreed()) {
int rem = readData.getBuffer().remaining();
while (localReadData != null && !localReadData.isFreed()) {
int rem = localReadData.getBuffer().remaining();
ChannelListener listener = receiveSetter.get();
if(listener == null) {
listener = DRAIN_LISTENER;
Expand All @@ -1027,7 +1033,7 @@ public void run() {
if(!AbstractFramedChannel.this.isOpen()) {
break;
}
if (readData != null && rem == readData.getBuffer().remaining()) {
if (localReadData != null && rem == localReadData.getBuffer().remaining()) {
break;//make sure we are making progress
}
}
Expand Down Expand Up @@ -1077,8 +1083,8 @@ public void run() {
} finally {
synchronized (AbstractFramedChannel.this) {
closeSubChannels();
if (readData != null) {
readData.close();
if (localReadData != null) {
localReadData.close();
readData = null;
}
}
Expand Down

0 comments on commit ab59c62

Please sign in to comment.