Skip to content

Commit

Permalink
Reduce InnerFlatMap allocations
Browse files Browse the repository at this point in the history
  • Loading branch information
jponge committed Sep 30, 2021
1 parent 9991f17 commit 3bc9b47
Showing 1 changed file with 7 additions and 7 deletions.
Expand Up @@ -2,9 +2,7 @@

import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.*;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down Expand Up @@ -566,7 +564,9 @@ static final class FlatMapInner<O> implements Subscription, MultiSubscriber<O> {

final int limit;

final AtomicReference<Subscription> subscription = new AtomicReference<>();
volatile Subscription subscription = null;
static final AtomicReferenceFieldUpdater<FlatMapInner, Subscription> SUBSCRIPTION_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(FlatMapInner.class, Subscription.class, "subscription");

long produced;

Expand All @@ -585,7 +585,7 @@ static final class FlatMapInner<O> implements Subscription, MultiSubscriber<O> {
@Override
public void onSubscribe(Subscription s) {
Objects.requireNonNull(s);
if (subscription.compareAndSet(null, s)) {
if (SUBSCRIPTION_UPDATER.compareAndSet(this, null, s)) {
s.request(Subscriptions.unboundedOrRequests(requests));
}
}
Expand Down Expand Up @@ -613,7 +613,7 @@ public void request(long n) {
long p = produced + n;
if (p >= limit) {
produced = 0L;
subscription.get().request(p);
subscription.request(p);
} else {
produced = p;
}
Expand All @@ -626,7 +626,7 @@ public void cancel() {

public void cancel(boolean doNotCancel) {
if (!doNotCancel) {
Subscription last = subscription.getAndSet(Subscriptions.CANCELLED);
Subscription last = SUBSCRIPTION_UPDATER.getAndSet(this, Subscriptions.CANCELLED);
if (last != null) {
last.cancel();
}
Expand Down

0 comments on commit 3bc9b47

Please sign in to comment.