Skip to content

Commit

Permalink
Merge branch '0.9.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Jun 29, 2020
2 parents 82c3394 + 5e4bc03 commit 02eaf62
Showing 1 changed file with 56 additions and 43 deletions.
99 changes: 56 additions & 43 deletions src/main/java/reactor/netty/channel/FluxReceive.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package reactor.netty.channel;

import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
Expand All @@ -34,7 +35,6 @@
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;

import static reactor.netty.ReactorNetty.format;

Expand All @@ -60,10 +60,19 @@ final class FluxReceive extends Flux<Object> implements Subscription, Disposable
Throwable inboundError;

volatile Disposable receiverCancel;
volatile int wip;

final static AtomicIntegerFieldUpdater<FluxReceive> WIP = AtomicIntegerFieldUpdater.newUpdater
(FluxReceive.class, "wip");
volatile int once;
static final AtomicIntegerFieldUpdater<FluxReceive> ONCE =
AtomicIntegerFieldUpdater.newUpdater(FluxReceive.class, "once");

// Please note, in this specific case WIP is non-volatile since all operation that
// involves work-in-progress pattern is within Netty Event-Loops which guarantees
// serial, thread-safe behaviour.
// However, we need that flag in order to preserve work-in-progress guarding that
// prevents stack overflow in case of onNext -> request -> onNext cycling on the
// same stack
int wip;


FluxReceive(ChannelOperations<?, ?> parent) {

Expand Down Expand Up @@ -131,11 +140,42 @@ public void request(long n) {

@Override
public void subscribe(CoreSubscriber<? super Object> s) {
if (eventLoop.inEventLoop()){
startReceiver(s);
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
if (log.isDebugEnabled()) {
log.debug(format(channel, "Subscribing inbound receiver [pending: {}, cancelled:{}, " +
"inboundDone: {}]"),
getPending(),
isCancelled(),
inboundDone);
}
if (inboundDone && getPending() == 0) {
if (inboundError != null) {
Operators.error(s, inboundError);
return;
}

Operators.complete(s);
return;
}

receiver = s;

s.onSubscribe(this);
}
else {
eventLoop.execute(() -> startReceiver(s));
if (inboundDone && getPending() == 0) {
if (inboundError != null) {
Operators.error(s, inboundError);
return;
}

Operators.complete(s);
}
else {
Operators.error(s,
new IllegalStateException(
"Only one connection receive subscriber allowed."));
}
}
}

Expand Down Expand Up @@ -164,7 +204,8 @@ final void cleanQueue(@Nullable Queue<Object> q){
}

final void drainReceiver() {
if(WIP.getAndIncrement(this) != 0){
// general protect against stackoverflow onNext -> request -> onNext
if (wip++ != 0) {
return;
}
int missed = 1;
Expand All @@ -185,7 +226,7 @@ final void drainReceiver() {
}
return;
}
missed = WIP.addAndGet(this, -missed);
missed = (wip -= missed);
if(missed == 0){
break;
}
Expand Down Expand Up @@ -250,7 +291,7 @@ final void drainReceiver() {
channel.config()
.setAutoRead(true);
}
missed = WIP.addAndGet(this, -missed);
missed = (wip -= missed);
if(missed == 0){
break;
}
Expand All @@ -269,43 +310,13 @@ else if (!needRead) {
.setAutoRead(false);
}

missed = WIP.addAndGet(this, -missed);
missed = (wip -= missed);
if(missed == 0){
break;
}
}
}

final void startReceiver(CoreSubscriber<? super Object> s) {
if (receiver == null) {
if (log.isDebugEnabled()) {
log.debug(format(channel, "Subscribing inbound receiver [pending: {}, cancelled:{}, " +
"inboundDone: {}]"),
getPending(),
isCancelled(),
inboundDone);
}
if (inboundDone && getPending() == 0) {
if (inboundError != null) {
Operators.error(s, inboundError);
return;
}

Operators.complete(s);
return;
}

receiver = s;

s.onSubscribe(this);
}
else {
Operators.error(s,
new IllegalStateException(
"Only one connection receive subscriber allowed."));
}
}

final void onInboundNext(Object msg) {
if (inboundDone || isCancelled()) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -336,8 +347,10 @@ else if (msg instanceof ByteBufHolder){
else {
Queue<Object> q = receiverQueue;
if (q == null) {
q = Queues.unbounded()
.get();
// please note, in that case we are using non-thread safe, simple
// ArrayDeque since all modifications on this queue happens withing
// Netty Event Loop
q = new ArrayDeque<>();
receiverQueue = q;
}
if (log.isDebugEnabled()){
Expand Down

0 comments on commit 02eaf62

Please sign in to comment.