Skip to content

Commit

Permalink
Add Channel#bufferAllocator() (#11651)
Browse files Browse the repository at this point in the history
__Motivation__

As we start to migrate codecs to use the new `Buffer` API, we need a way for them to get a handle of `BufferAllocator`.

__Modification__

Added `bufferAllocator()` method to `ChannelConfig`, `Channel` and `ChannelHandlerContext`

__Result__

Codecs can allocate `Buffer` instances
  • Loading branch information
Nitesh Kant committed Sep 3, 2021
1 parent e97cb12 commit 683ff42
Show file tree
Hide file tree
Showing 37 changed files with 380 additions and 5 deletions.
@@ -0,0 +1,83 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.buffer.api;

import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.Locale;
import java.util.function.Supplier;

import static io.netty.buffer.api.BufferAllocator.offHeapPooled;
import static io.netty.buffer.api.BufferAllocator.offHeapUnpooled;
import static io.netty.buffer.api.BufferAllocator.onHeapPooled;
import static io.netty.buffer.api.BufferAllocator.onHeapUnpooled;
import static io.netty.util.internal.ObjectUtil.checkNotNullWithIAE;
import static io.netty.util.internal.PlatformDependent.directBufferPreferred;
import static java.lang.Runtime.getRuntime;

/**
* A {@link BufferAllocator} which is {@link #close() disposed} when the {@link Runtime} is shutdown.
*/
public final class DefaultGlobalBufferAllocator implements BufferAllocator {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultGlobalBufferAllocator.class);
public static final BufferAllocator DEFAUL_GLOBAL_BUFFER_ALLOCATOR;

static {
String allocType = SystemPropertyUtil.get(
"io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
allocType = allocType.toLowerCase(Locale.US).trim();

BufferAllocator alloc;
if ("unpooled".equals(allocType)) {
alloc = directBufferPreferred() ? offHeapUnpooled() : onHeapUnpooled();
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else if ("pooled".equals(allocType)) {
alloc = directBufferPreferred() ? offHeapPooled() : onHeapPooled();
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else {
alloc = directBufferPreferred() ? offHeapPooled() : onHeapPooled();
logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
}
DEFAUL_GLOBAL_BUFFER_ALLOCATOR = new DefaultGlobalBufferAllocator(alloc);
}

private final BufferAllocator delegate;

private DefaultGlobalBufferAllocator(BufferAllocator delegate) {
this.delegate = checkNotNullWithIAE(delegate, "delegate");
getRuntime().addShutdownHook(new Thread(this.delegate::close));
}

@Override
public Buffer allocate(int size) {
return delegate.allocate(size);
}

@Override
public Supplier<Buffer> constBufferSupplier(byte[] bytes) {
return delegate.constBufferSupplier(bytes);
}

/**
* @throws UnsupportedOperationException Close is not supported as this is a shared allocator.
*/
@Override
public void close() {
throw new UnsupportedOperationException("Global buffer allocator can not be closed explicitly.");
}
}
Expand Up @@ -16,6 +16,7 @@
package io.netty.handler.codec.http;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.api.BufferAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -139,6 +140,11 @@ public ByteBufAllocator alloc() {
return ctx.alloc();
}

@Override
public BufferAllocator bufferAllocator() {
return ctx.bufferAllocator();
}

@Deprecated
public <T> Attribute<T> attr(AttributeKey<T> key) {
return ctx.attr(key);
Expand Down
Expand Up @@ -18,6 +18,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.api.BufferAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -213,6 +214,11 @@ public ByteBufAllocator alloc() {
return channel.alloc();
}

@Override
public BufferAllocator bufferAllocator() {
return channel.bufferAllocator();
}

@Override
public <T> Attribute<T> attr(AttributeKey<T> key) {
return channel.attr(key);
Expand Down
Expand Up @@ -17,6 +17,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.api.BufferAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -207,6 +208,11 @@ public ByteBufAllocator alloc() {
return ctx.alloc();
}

@Override
public BufferAllocator bufferAllocator() {
return ctx.bufferAllocator();
}

@Override
@Deprecated
public <T> Attribute<T> attr(AttributeKey<T> key) {
Expand Down
Expand Up @@ -20,6 +20,7 @@
import io.netty.buffer.ByteBufConvertible;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.api.BufferAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandler;
Expand Down Expand Up @@ -626,6 +627,11 @@ public ByteBufAllocator alloc() {
return ctx.alloc();
}

@Override
public BufferAllocator bufferAllocator() {
return ctx.bufferAllocator();
}

@Override
@Deprecated
public <T> Attribute<T> attr(AttributeKey<T> key) {
Expand Down
Expand Up @@ -15,6 +15,7 @@
package io.netty.microbench.channel;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.api.BufferAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -37,11 +38,22 @@ public abstract class EmbeddedChannelHandlerContext implements ChannelHandlerCon
private final EventLoop eventLoop;
private final Channel channel;
private final ByteBufAllocator alloc;
private final BufferAllocator bufferAllocator;
private final ChannelHandler handler;
private SocketAddress localAddress;

protected EmbeddedChannelHandlerContext(ByteBufAllocator alloc, ChannelHandler handler, EmbeddedChannel channel) {
this.alloc = requireNonNull(alloc, "alloc");
this.bufferAllocator = null;
this.channel = requireNonNull(channel, "channel");
this.handler = requireNonNull(handler, "handler");
eventLoop = requireNonNull(channel.executor(), "eventLoop");
}

protected EmbeddedChannelHandlerContext(BufferAllocator bufferAllocator, ChannelHandler handler,
EmbeddedChannel channel) {
this.bufferAllocator = requireNonNull(bufferAllocator, "bufferAllocator");
this.alloc = null;
this.channel = requireNonNull(channel, "channel");
this.handler = requireNonNull(handler, "handler");
eventLoop = requireNonNull(channel.executor(), "eventLoop");
Expand Down Expand Up @@ -244,6 +256,11 @@ public final ByteBufAllocator alloc() {
return alloc;
}

@Override
public BufferAllocator bufferAllocator() {
return bufferAllocator;
}

@Override
public final Promise<Void> newPromise() {
return channel().newPromise();
Expand Down
Expand Up @@ -16,6 +16,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.api.BufferAllocator;
import io.netty.channel.ChannelHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
Expand All @@ -28,13 +29,25 @@ public abstract class EmbeddedChannelWriteAccumulatingHandlerContext extends Emb
private final ByteToMessageDecoder.Cumulator cumulator;

protected EmbeddedChannelWriteAccumulatingHandlerContext(ByteBufAllocator alloc, ChannelHandler handler,
ByteToMessageDecoder.Cumulator writeCumulator) {
ByteToMessageDecoder.Cumulator writeCumulator) {
this(alloc, handler, writeCumulator, new EmbeddedChannel());
}

protected EmbeddedChannelWriteAccumulatingHandlerContext(ByteBufAllocator alloc, ChannelHandler handler,
ByteToMessageDecoder.Cumulator writeCumulator,
EmbeddedChannel channel) {
ByteToMessageDecoder.Cumulator writeCumulator,
EmbeddedChannel channel) {
super(alloc, handler, channel);
cumulator = requireNonNull(writeCumulator, "writeCumulator");
}

protected EmbeddedChannelWriteAccumulatingHandlerContext(BufferAllocator alloc, ChannelHandler handler,
ByteToMessageDecoder.Cumulator writeCumulator) {
this(alloc, handler, writeCumulator, new EmbeddedChannel());
}

protected EmbeddedChannelWriteAccumulatingHandlerContext(BufferAllocator alloc, ChannelHandler handler,
ByteToMessageDecoder.Cumulator writeCumulator,
EmbeddedChannel channel) {
super(alloc, handler, channel);
cumulator = requireNonNull(writeCumulator, "writeCumulator");
}
Expand Down
Expand Up @@ -15,6 +15,7 @@
package io.netty.microbench.channel;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.api.BufferAllocator;
import io.netty.channel.ChannelHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.ReferenceCounted;
Expand All @@ -29,6 +30,14 @@ protected EmbeddedChannelWriteReleaseHandlerContext(ByteBufAllocator alloc, Chan
EmbeddedChannel channel) {
super(alloc, handler, channel);
}
protected EmbeddedChannelWriteReleaseHandlerContext(BufferAllocator alloc, ChannelHandler handler) {
this(alloc, handler, new EmbeddedChannel());
}

protected EmbeddedChannelWriteReleaseHandlerContext(BufferAllocator alloc, ChannelHandler handler,
EmbeddedChannel channel) {
super(alloc, handler, channel);
}

@Override
protected abstract void handleException(Throwable t);
Expand Down
Expand Up @@ -17,6 +17,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.api.BufferAllocator;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.ssl.SslContext;
Expand Down Expand Up @@ -164,7 +165,12 @@ protected final SslHandler newServerHandler(ByteBufAllocator allocator) {
private static final class SslThroughputBenchmarkHandlerContext extends
EmbeddedChannelWriteAccumulatingHandlerContext {
SslThroughputBenchmarkHandlerContext(ByteBufAllocator alloc, ChannelHandler handler,
ByteToMessageDecoder.Cumulator writeCumulator) {
ByteToMessageDecoder.Cumulator writeCumulator) {
super(alloc, handler, writeCumulator);
}

SslThroughputBenchmarkHandlerContext(BufferAllocator alloc, ChannelHandler handler,
ByteToMessageDecoder.Cumulator writeCumulator) {
super(alloc, handler, writeCumulator);
}

Expand Down
Expand Up @@ -17,6 +17,8 @@

import io.netty.buffer.ByteBufAllocator;

import io.netty.buffer.api.BufferAllocator;
import io.netty.channel.ChannelConfig;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
Expand Down Expand Up @@ -56,6 +58,12 @@ public EpollChannelConfig setAllocator(ByteBufAllocator allocator) {
return this;
}

@Override
public EpollChannelConfig setBufferAllocator(BufferAllocator bufferAllocator) {
super.setBufferAllocator(bufferAllocator);
return this;
}

@Override
public EpollChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {
Expand Down
Expand Up @@ -17,6 +17,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.api.BufferAllocator;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.FixedRecvByteBufAllocator;
Expand Down Expand Up @@ -220,6 +221,12 @@ public EpollDatagramChannelConfig setAllocator(ByteBufAllocator allocator) {
return this;
}

@Override
public EpollDatagramChannelConfig setBufferAllocator(BufferAllocator bufferAllocator) {
super.setBufferAllocator(bufferAllocator);
return this;
}

@Override
public EpollDatagramChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
super.setConnectTimeoutMillis(connectTimeoutMillis);
Expand Down
Expand Up @@ -16,6 +16,7 @@
package io.netty.channel.epoll;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.api.BufferAllocator;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.FixedRecvByteBufAllocator;
Expand Down Expand Up @@ -96,6 +97,12 @@ public EpollDomainDatagramChannelConfig setAllocator(ByteBufAllocator allocator)
return this;
}

@Override
public EpollDomainDatagramChannelConfig setBufferAllocator(BufferAllocator bufferAllocator) {
super.setBufferAllocator(bufferAllocator);
return this;
}

@Override
public EpollDomainDatagramChannelConfig setAutoClose(boolean autoClose) {
super.setAutoClose(autoClose);
Expand Down
Expand Up @@ -18,6 +18,7 @@
import static java.util.Objects.requireNonNull;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.api.BufferAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
Expand Down Expand Up @@ -113,6 +114,12 @@ public EpollDomainSocketChannelConfig setAllocator(ByteBufAllocator allocator) {
return this;
}

@Override
public EpollDomainSocketChannelConfig setBufferAllocator(BufferAllocator bufferAllocator) {
super.setBufferAllocator(bufferAllocator);
return this;
}

@Override
public EpollDomainSocketChannelConfig setAutoClose(boolean autoClose) {
super.setAutoClose(autoClose);
Expand Down
Expand Up @@ -16,6 +16,7 @@
package io.netty.channel.epoll;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.api.BufferAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
Expand Down Expand Up @@ -94,6 +95,12 @@ public EpollDuplexChannelConfig setAllocator(ByteBufAllocator allocator) {
return this;
}

@Override
public EpollDuplexChannelConfig setBufferAllocator(BufferAllocator bufferAllocator) {
super.setBufferAllocator(bufferAllocator);
return this;
}

@Override
public EpollDuplexChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
super.setRecvByteBufAllocator(allocator);
Expand Down

0 comments on commit 683ff42

Please sign in to comment.