diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java index 56f825959f1..a7433f6b582 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.protocol.core; +import java.nio.channels.FileChannel; import java.util.concurrent.locks.Lock; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -66,6 +67,29 @@ public interface Channel { */ boolean send(Packet packet); + /** + * Sends a packet on this channel. + * + * @param packet the packet to send + * @param callback callback after send + * @return false if the packet was rejected by an outgoing interceptor; true if the send was + * successful + */ + boolean send(Packet packet, Callback callback); + + /** + * Sends a packet and file on this channel. + * + * @param packet the packet to send + * @param fileChannel the file channel retrieved from raf + * @param offset the position of the raf + * @param dataSize the data size to send + * @param callback callback after send + * @return false if the packet was rejected by an outgoing interceptor; true if the send was + * successful + */ + boolean send(Packet packet, FileChannel fileChannel, long offset, int dataSize, Callback callback); + /** * Sends a packet on this channel. * @@ -247,4 +271,8 @@ public interface Channel { * @param transferring whether the channel is transferring */ void setTransferring(boolean transferring); + + interface Callback { + void done(boolean success); + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java index 377b1b59016..cf35216d3b0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java @@ -134,11 +134,9 @@ default boolean isVersionNewFQQN() { ActiveMQPrincipal getDefaultActiveMQPrincipal(); /** - * - * @param size size we are trying to write * @param timeout * @return * @throws IllegalStateException if the connection is closed */ - boolean blockUntilWritable(int size, long timeout); + boolean blockUntilWritable(long timeout); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 26c405c0134..f9bbd66d40a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -1017,13 +1017,14 @@ private int sendSessionSendContinuationMessage(Channel channel, final long blockingCallTimeoutMillis = Math.max(0, connection.getBlockingCallTimeout()); final long startFlowControl = System.nanoTime(); try { - final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis); + final boolean isWritable = connection.blockUntilWritable(blockingCallTimeoutMillis); if (!isWritable) { final long endFlowControl = System.nanoTime(); final long elapsedFlowControl = endFlowControl - startFlowControl; final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl); ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage(); - logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis + " ms on a not writable connection: [" + connection.getID() + "]"); + logger.debugf("try to write %d bytes after blocked %d ms on a not writable connection: [%s]", + expectedEncodeSize, elapsedMillis, connection.getID()); } if (requiresResponse) { // When sending it blocking, only the last chunk will be blocking. diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 154ab8aa809..1370968f421 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl; +import java.nio.channels.FileChannel; import java.util.EnumSet; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -25,6 +26,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import io.netty.channel.ChannelFutureListener; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; @@ -39,6 +41,7 @@ import org.apache.activemq.artemis.core.protocol.core.ResponseHandler; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.ConcurrentUtil; import org.jboss.logging.Logger; @@ -148,6 +151,11 @@ public ChannelImpl(final CoreRemotingConnection connection, } this.interceptors = interceptors; + //zero copy transfer is initialized only for replication channels + if (id == CHANNEL_ID.REPLICATION.id && connection.getTransportConnection() instanceof NettyConnection) { + final NettyConnection nettyConnection = (NettyConnection) connection.getTransportConnection(); + nettyConnection.initializeZeroCopyTransfer(); + } } @Override @@ -274,73 +282,134 @@ private void waitForFailOver(String timeoutMsg) { } } - // This must never called by more than one thread concurrently - private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) { - if (invokeInterceptors(packet, interceptors, connection) != null) { - return false; + private ActiveMQBuffer beforeSend(final Packet packet, final int reconnectID) { + packet.setChannelID(id); + + if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { + packet.setCorrelationID(responseAsyncCache.nextCorrelationID()); } - synchronized (sendLock) { - packet.setChannelID(id); + if (logger.isTraceEnabled()) { + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id); + } + + ActiveMQBuffer buffer = packet.encode(connection); - if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { - packet.setCorrelationID(responseAsyncCache.nextCorrelationID()); + lock.lock(); + + try { + if (failingOver) { + waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send"); } - if (logger.isTraceEnabled()) { - logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id); + // Sanity check + if (transferring) { + throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover(); } - ActiveMQBuffer buffer = packet.encode(connection); + if (resendCache != null && packet.isRequiresConfirmations()) { + addResendPacket(packet); + } - lock.lock(); + } finally { + lock.unlock(); + } - try { - if (failingOver) { - waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send"); - } + if (logger.isTraceEnabled()) { + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id); + } - // Sanity check - if (transferring) { - throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover(); - } + checkReconnectID(reconnectID); - if (resendCache != null && packet.isRequiresConfirmations()) { - addResendPacket(packet); + //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in, + //As the send could block if the response cache cannot add, preventing responses to be handled. + if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { + while (!responseAsyncCache.add(packet)) { + try { + Thread.sleep(1); + } catch (Exception e) { + // Ignore } - - } finally { - lock.unlock(); } + } - if (logger.isTraceEnabled()) { - logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id); + return buffer; + } + + private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) { + return send(packet, reconnectID, flush, batch, null); + } + + // This must never called by more than one thread concurrently + private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch, Callback callback) { + if (invokeInterceptors(packet, interceptors, connection) != null) { + if (callback != null) { + callback.done(false); } + return false; + } - checkReconnectID(reconnectID); + synchronized (sendLock) { + ActiveMQBuffer buffer = beforeSend(packet, reconnectID); - //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in, - //As the send could block if the response cache cannot add, preventing responses to be handled. - if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { - while (!responseAsyncCache.add(packet)) { - try { - Thread.sleep(1); - } catch (Exception e) { - // Ignore - } + // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp + // buffer is full, preventing any incoming buffers being handled and blocking failover + try { + if (callback == null) { + connection.getTransportConnection().write(buffer, flush, batch); + } else { + connection.getTransportConnection().write(buffer, flush, batch, (ChannelFutureListener) future -> callback.done(future == null || future.isSuccess())); + } + } catch (Throwable t) { + //If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full. + //The client would get still know about this as the exception bubbles up the call stack instead. + if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { + responseAsyncCache.remove(packet.getCorrelationID()); + } + if (callback != null) { + callback.done(false); } + throw t; + } + return true; + } + } + + @Override + public boolean send(Packet packet, Callback callback) { + return send(packet, -1, false, false, callback); + } + + @Override + public boolean send(Packet packet, + FileChannel fileChannel, + long offset, + int dataSize, + Callback callback) { + if (invokeInterceptors(packet, interceptors, connection) != null) { + if (callback != null) { + callback.done(false); } + return false; + } + + synchronized (sendLock) { + ActiveMQBuffer buffer = beforeSend(packet, -1); // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp // buffer is full, preventing any incoming buffers being handled and blocking failover try { - connection.getTransportConnection().write(buffer, flush, batch); + connection.getTransportConnection().write(buffer); + connection.getTransportConnection().write(fileChannel, offset, dataSize, callback == null ? null : (ChannelFutureListener) future -> callback.done(future == null || future.isSuccess())); } catch (Throwable t) { //If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full. //The client would get still know about this as the exception bubbles up the call stack instead. if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { responseAsyncCache.remove(packet.getCorrelationID()); } + if (callback != null) { + callback.done(false); + } throw t; } return true; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index f8f85e8a428..a7a32539ff7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -336,7 +336,11 @@ protected void encodeHeader(ActiveMQBuffer buffer) { } protected void encodeSize(ActiveMQBuffer buffer) { - size = buffer.writerIndex(); + encodeSize(buffer, buffer.writerIndex()); + } + + protected void encodeSize(ActiveMQBuffer buffer, int size) { + this.size = size; // The length doesn't include the actual length byte int len = size - DataConstants.SIZE_INT; @@ -345,9 +349,10 @@ protected void encodeSize(ActiveMQBuffer buffer) { } protected ActiveMQBuffer createPacket(CoreRemotingConnection connection) { + return createPacket(connection, expectedEncodeSize()); + } - int size = expectedEncodeSize(); - + protected ActiveMQBuffer createPacket(CoreRemotingConnection connection, int size) { if (connection == null) { return new ChannelBufferWrapper(Unpooled.buffer(size)); } else { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java index 418e3f150b9..cb64d3ddccf 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java @@ -244,8 +244,8 @@ public void destroy() { } @Override - public boolean blockUntilWritable(int size, long timeout) { - return transportConnection.blockUntilWritable(size, timeout, TimeUnit.MILLISECONDS); + public boolean blockUntilWritable(long timeout) { + return transportConnection.blockUntilWritable(timeout, TimeUnit.MILLISECONDS); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/AbsoluteChunkedNioFile.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/AbsoluteChunkedNioFile.java new file mode 100644 index 00000000000..5678f32df4b --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/AbsoluteChunkedNioFile.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.remoting.impl.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.FileRegion; +import io.netty.handler.stream.ChunkedInput; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.FileChannel; + +/** + * A {@link ChunkedInput} that fetches data from a file chunk by chunk using + * NIO {@link FileChannel}. + *

+ * If your operating system supports + * zero-copy file transfer + * such as {@code sendfile()}, you might want to use {@link FileRegion} instead. + */ +class AbsoluteChunkedNioFile implements ChunkedInput { + + private final FileChannel in; + private final long startOffset; + private final long endOffset; + private final int chunkSize; + private long offset; + + /** + * Creates a new instance that fetches data from the specified file. + */ + AbsoluteChunkedNioFile(File in) throws IOException { + this(new RandomAccessFile(in, "r").getChannel()); + } + + /** + * Creates a new instance that fetches data from the specified file. + * + * @param chunkSize the number of bytes to fetch on each + * {@link #readChunk(ChannelHandlerContext)} call + */ + AbsoluteChunkedNioFile(File in, int chunkSize) throws IOException { + this(new RandomAccessFile(in, "r").getChannel(), chunkSize); + } + + /** + * Creates a new instance that fetches data from the specified file. + */ + AbsoluteChunkedNioFile(FileChannel in) throws IOException { + this(in, 8192); + } + + /** + * Creates a new instance that fetches data from the specified file. + * + * @param chunkSize the number of bytes to fetch on each + * {@link #readChunk(ChannelHandlerContext)} call + */ + AbsoluteChunkedNioFile(FileChannel in, int chunkSize) throws IOException { + this(in, 0, in.size(), chunkSize); + } + + /** + * Creates a new instance that fetches data from the specified file. + * + * @param offset the offset of the file where the transfer begins + * @param length the number of bytes to transfer + * @param chunkSize the number of bytes to fetch on each + * {@link #readChunk(ChannelHandlerContext)} call + */ + AbsoluteChunkedNioFile(FileChannel in, long offset, long length, int chunkSize) throws IOException { + if (in == null) { + throw new NullPointerException("in"); + } + if (offset < 0) { + throw new IllegalArgumentException("offset: " + offset + " (expected: 0 or greater)"); + } + if (length < 0) { + throw new IllegalArgumentException("length: " + length + " (expected: 0 or greater)"); + } + if (chunkSize <= 0) { + throw new IllegalArgumentException("chunkSize: " + chunkSize + " (expected: a positive integer)"); + } + if (!in.isOpen()) { + throw new ClosedChannelException(); + } + this.in = in; + this.chunkSize = chunkSize; + this.offset = startOffset = offset; + endOffset = offset + length; + } + + /** + * Returns the offset in the file where the transfer began. + */ + public long startOffset() { + return startOffset; + } + + /** + * Returns the offset in the file where the transfer will end. + */ + public long endOffset() { + return endOffset; + } + + /** + * Returns the offset in the file where the transfer is happening currently. + */ + public long currentOffset() { + return offset; + } + + @Override + public boolean isEndOfInput() throws Exception { + return !(offset < endOffset && in.isOpen()); + } + + @Override + public void close() throws Exception { + in.close(); + } + + @Deprecated + @Override + public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { + return readChunk(ctx.alloc()); + } + + @Override + public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception { + long offset = this.offset; + if (offset >= endOffset) { + return null; + } + + int chunkSize = (int) Math.min(this.chunkSize, endOffset - offset); + ByteBuf buffer = allocator.buffer(chunkSize); + boolean release = true; + try { + int readBytes = 0; + for (;; ) { + int localReadBytes = buffer.writeBytes(in, offset + readBytes, chunkSize - readBytes); + if (localReadBytes < 0) { + break; + } + readBytes += localReadBytes; + if (readBytes == chunkSize) { + break; + } + } + this.offset += readBytes; + release = false; + return buffer; + } finally { + if (release) { + buffer.release(); + } + } + } + + @Override + public long length() { + return endOffset - startOffset; + } + + @Override + public long progress() { + return offset - startOffset; + } +} + diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java index de8b49ef560..681eebb44ef 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java @@ -62,7 +62,11 @@ public void channelActive(final ChannelHandlerContext ctx) throws Exception { @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - listener.connectionReadyForWrites(channelId(ctx.channel()), ctx.channel().isWritable()); + try { + listener.connectionReadyForWrites(channelId(ctx.channel()), ctx.channel().isWritable()); + } finally { + ctx.fireChannelWritabilityChanged(); + } } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 51330c727bc..3e9806c756a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -16,7 +16,9 @@ */ package org.apache.activemq.artemis.core.remoting.impl.netty; +import java.io.IOException; import java.net.SocketAddress; +import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -27,8 +29,13 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultFileRegion; import io.netty.channel.EventLoop; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.util.internal.SystemPropertyUtil; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -46,8 +53,10 @@ public class NettyConnection implements Connection { private static final Logger logger = Logger.getLogger(NettyConnection.class); - + public static final String USE_FILE_REGION_PROP_NAME = "io.netty.file.region"; + private final boolean USE_FILE_REGION = SystemPropertyUtil.getBoolean(USE_FILE_REGION_PROP_NAME, true); private static final int DEFAULT_BATCH_BYTES = Integer.getInteger("io.netty.batch.bytes", 8192); + private static final int CHUNKED_NIO_BYTES = 32 * 1024; private static final int DEFAULT_WAIT_MILLIS = 10_000; protected final Channel channel; @@ -90,6 +99,20 @@ public NettyConnection(final Map configuration, this.batchLimit = batchingEnabled ? Math.min(this.writeBufferHighWaterMark, DEFAULT_BATCH_BYTES) : 0; } + /** + * It prepares the {@link #getNettyChannel()}'s pipeline to be able to handle + * {@link io.netty.handler.stream.ChunkedInput} objects, if {@link DefaultFileRegion} are not supported. + * This method is not thread-safe and should be called only once on a connection lifecycle. + */ + public void initializeZeroCopyTransfer() { + final ChannelPipeline pipeline = channel.pipeline(); + if (!USE_FILE_REGION || pipeline.get(SslHandler.class) != null) { + if (pipeline.get(ChunkedWriteHandler.class) == null) { + pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); + } + } + } + private static void waitFor(ChannelPromise promise, long millis) { try { final boolean completed = promise.await(millis); @@ -117,10 +140,6 @@ private static int batchBufferSize(Channel channel, int writeBufferHighWaterMark return writtenBytes; } - public final int pendingWritesOnChannel() { - return batchBufferSize(this.channel, this.writeBufferHighWaterMark); - } - public final Channel getNettyChannel() { return channel; } @@ -293,7 +312,7 @@ private void checkConnectionState() { } @Override - public final boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) { + public final boolean blockUntilWritable(final long timeout, final TimeUnit timeUnit) { checkConnectionState(); final boolean isAllowedToBlock = isAllowedToBlock(); if (!isAllowedToBlock) { @@ -308,7 +327,7 @@ public final boolean blockUntilWritable(final int requiredCapacity, final long t if (logger.isDebugEnabled()) { logger.debug("Calling blockUntilWritable using a thread where it's not allowed"); } - return canWrite(requiredCapacity); + return channel.isWritable(); } else { final long timeoutNanos = timeUnit.toNanos(timeout); final long deadline = System.nanoTime() + timeoutNanos; @@ -322,7 +341,7 @@ public final boolean blockUntilWritable(final int requiredCapacity, final long t parkNanos = 1000L; } boolean canWrite; - while (!(canWrite = canWrite(requiredCapacity)) && (System.nanoTime() - deadline) < 0) { + while (!(canWrite = channel.isWritable()) && (System.nanoTime() - deadline) < 0) { //periodically check the connection state checkConnectionState(); LockSupport.parkNanos(parkNanos); @@ -337,17 +356,28 @@ private boolean isAllowedToBlock() { return !inEventLoop; } - private boolean canWrite(final int requiredCapacity) { - //evaluate if the write request could be taken: - //there is enough space in the write buffer? - final long totalPendingWrites = this.pendingWritesOnChannel(); - final boolean canWrite; - if (requiredCapacity > this.writeBufferHighWaterMark) { - canWrite = totalPendingWrites == 0; + private Object getFileObject(FileChannel fileChannel, long offset, int dataSize) { + if (USE_FILE_REGION && channel.pipeline().get(SslHandler.class) == null) { + return new DefaultFileRegion(fileChannel, offset, dataSize) { + @Override + protected void deallocate() { + //no op + } + }; } else { - canWrite = (totalPendingWrites + requiredCapacity) <= this.writeBufferHighWaterMark; + assert channel.pipeline().get(ChunkedWriteHandler.class) != null : + "ChunkedWriteHandler needs to be added to the pipeline to handle ChunkedNioFile"; + try { + return new AbsoluteChunkedNioFile(fileChannel, offset, dataSize, CHUNKED_NIO_BYTES) { + @Override + public void close() throws Exception { + //no op + } + }; + } catch (IOException e) { + throw new RuntimeException(e); + } } - return canWrite; } @Override @@ -390,6 +420,18 @@ public final void write(ActiveMQBuffer buffer, } } + @Override + public void write(FileChannel fileChannel, + long offset, + int dataSize, + final ChannelFutureListener futureListener) { + final ChannelPromise promise = futureListener != null ? channel.newPromise() : channel.voidPromise(); + ChannelFuture channelFuture = channel.writeAndFlush(getFileObject(fileChannel, offset, dataSize), promise); + if (futureListener != null) { + channelFuture.addListener(futureListener); + } + } + private static void flushAndWait(final Channel channel, final ChannelPromise promise) { if (!channel.eventLoop().inEventLoop()) { waitFor(promise, DEFAULT_WAIT_MILLIS); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java index ebde456034e..26dae1164c4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.spi.core.remoting; +import java.nio.channels.FileChannel; import java.util.concurrent.TimeUnit; import io.netty.channel.ChannelFutureListener; @@ -46,18 +47,16 @@ public interface Connection { boolean isOpen(); /** - * Causes the current thread to wait until the connection can enqueue the required capacity unless the specified waiting time elapses. - * The available capacity of the connection could change concurrently hence this method is suitable to perform precise flow-control - * only in a single writer case, while its precision decrease inversely proportional with the rate and the number of concurrent writers. + * Causes the current thread to wait until the connection can enqueue a write request unless the specified waiting time elapses. + * The available capacity of the connection could change concurrently hence this method is not suitable to perform precise flow-control. * If the current thread is not allowed to block the timeout will be ignored dependently on the connection type. * - * @param requiredCapacity the capacity in bytes to be enqueued * @param timeout the maximum time to wait * @param timeUnit the time unit of the timeout argument * @return {@code true} if the connection can enqueue {@code requiredCapacity} bytes, {@code false} otherwise * @throws IllegalStateException if the connection is closed */ - default boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) { + default boolean blockUntilWritable(final long timeout, final TimeUnit timeUnit) { return true; } @@ -101,6 +100,8 @@ default boolean blockUntilWritable(final int requiredCapacity, final long timeou */ void write(ActiveMQBuffer buffer); + void write(FileChannel fileChannel, long offset, int dataSize, ChannelFutureListener futureListener); + /** * This should close the internal channel without calling any listeners. * This is to avoid a situation where the broker is busy writing on an internal thread. diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java index 7d3fb23939b..9272a5e015f 100644 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl; import javax.security.auth.Subject; +import java.nio.channels.FileChannel; import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; @@ -237,7 +238,7 @@ public ActiveMQPrincipal getDefaultActiveMQPrincipal() { } @Override - public boolean blockUntilWritable(int size, long timeout) { + public boolean blockUntilWritable(long timeout) { return false; } @@ -392,6 +393,14 @@ public void write(ActiveMQBuffer buffer) { } + @Override + public void write(FileChannel fileChannel, + long offset, + int dataSize, + ChannelFutureListener channelFutureListener) { + + } + @Override public void forceClose() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java index 275805ec2bd..8a0ad93ac73 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java @@ -178,9 +178,12 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener * * @param replicator * @param pageIds + * @param flushFileTimeoutMillis * @throws Exception */ - void sendPages(ReplicationManager replicator, Collection pageIds) throws Exception; + void sendPages(ReplicationManager replicator, + Collection pageIds, + long flushFileTimeoutMillis) throws Exception; /** * This method will disable cleanup of pages. No page will be deleted after this call. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 452be16e737..322ae20e5f3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -1185,7 +1185,9 @@ public Collection getCurrentIds() throws Exception { } @Override - public void sendPages(ReplicationManager replicator, Collection pageIds) throws Exception { + public void sendPages(ReplicationManager replicator, + Collection pageIds, + long flushFileTimeoutMillis) throws Exception { final SequentialFileFactory factory = fileFactory; for (Integer id : pageIds) { SequentialFile sFile = factory.createSequentialFile(createFileName(id)); @@ -1193,7 +1195,7 @@ public void sendPages(ReplicationManager replicator, Collection pageIds continue; } ActiveMQServerLogger.LOGGER.replicaSyncFile(sFile, sFile.size()); - replicator.syncPages(sFile, id, getAddress()); + replicator.syncPages(sFile, id, getAddress()).get(flushFileTimeoutMillis, TimeUnit.MILLISECONDS); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index e5da1d9a485..f36cb5672bd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -595,11 +595,13 @@ public SequentialFile createFileForLargeMessage(final long messageID, LargeMessa /** * Send an entire journal file to a replicating backup server. */ - private void sendJournalFile(JournalFile[] journalFiles, JournalContent type) throws Exception { + private void sendJournalFile(JournalFile[] journalFiles, + JournalContent type, + long flushFileTimeoutMillis) throws Exception { for (JournalFile jf : journalFiles) { if (!started) return; - replicator.syncJournalFile(jf, type); + replicator.syncJournalFile(jf, type).get(flushFileTimeoutMillis, TimeUnit.MILLISECONDS); } } @@ -688,10 +690,10 @@ public void startReplication(ReplicationManager replicationManager, storageManagerLock.writeLock().unlock(); } - sendJournalFile(messageFiles, JournalContent.MESSAGES); - sendJournalFile(bindingsFiles, JournalContent.BINDINGS); - sendLargeMessageFiles(pendingLargeMessages); - sendPagesToBackup(pageFilesToSync, pagingManager); + sendJournalFile(messageFiles, JournalContent.MESSAGES, initialReplicationSyncTimeout); + sendJournalFile(bindingsFiles, JournalContent.BINDINGS, initialReplicationSyncTimeout); + sendLargeMessageFiles(pendingLargeMessages, initialReplicationSyncTimeout); + sendPagesToBackup(pageFilesToSync, pagingManager, initialReplicationSyncTimeout); storageManagerLock.writeLock().lock(); try { @@ -714,7 +716,7 @@ public void startReplication(ReplicationManager replicationManager, } } - private void sendLargeMessageFiles(final Map> pendingLargeMessages) throws Exception { + private void sendLargeMessageFiles(final Map> pendingLargeMessages, long flushFileTimeoutMillis) throws Exception { Iterator>> iter = pendingLargeMessages.entrySet().iterator(); while (started && iter.hasNext()) { Map.Entry> entry = iter.next(); @@ -725,7 +727,7 @@ private void sendLargeMessageFiles(final Map> pendingLa if (!seqFile.exists()) continue; if (replicator != null) { - replicator.syncLargeMessageFile(seqFile, size, id); + replicator.syncLargeMessageFile(seqFile, size, id).get(flushFileTimeoutMillis, TimeUnit.MILLISECONDS); } else { throw ActiveMQMessageBundle.BUNDLE.replicatorIsNull(); } @@ -792,12 +794,13 @@ private Map> recoverPendingLargeMessages() throws Excep * @throws Exception */ private void sendPagesToBackup(Map> pageFilesToSync, - PagingManager manager) throws Exception { + PagingManager manager, + long flushFileTimeoutMillis) throws Exception { for (Map.Entry> entry : pageFilesToSync.entrySet()) { if (!started) return; PagingStore store = manager.getPageStore(entry.getKey()); - store.sendPages(replicator, entry.getValue()); + store.sendPages(replicator, entry.getValue(), flushFileTimeoutMillis); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java index b81782bcd05..84ad5c72221 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java @@ -16,22 +16,30 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.Arrays; import java.util.EnumSet; +import java.util.Objects; import java.util.Set; -import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager; +import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; import org.apache.activemq.artemis.utils.DataConstants; +import org.jboss.logging.Logger; /** * Message is used to sync {@link org.apache.activemq.artemis.core.io.SequentialFile}s to a backup server. The {@link FileType} controls * which extra information is sent. */ public final class ReplicationSyncFileMessage extends PacketImpl { + private static final Logger logger = Logger.getLogger(ReplicationSyncFileMessage.class); /** * The JournalType or {@code null} if sync'ing large-messages. @@ -43,10 +51,12 @@ public final class ReplicationSyncFileMessage extends PacketImpl { */ private long fileId; private int dataSize; - private ByteBuf byteBuffer; private byte[] byteArray; private SimpleString pageStoreName; private FileType fileType; + private RandomAccessFile raf; + private FileChannel fileChannel; + private long offset; public enum FileType { JOURNAL(0), PAGE(1), LARGE_MESSAGE(2); @@ -78,14 +88,18 @@ public ReplicationSyncFileMessage() { public ReplicationSyncFileMessage(AbstractJournalStorageManager.JournalContent content, SimpleString storeName, long id, - int size, - ByteBuf buffer) { + RandomAccessFile raf, + FileChannel fileChannel, + long offset, + int size) { this(); - this.byteBuffer = buffer; this.pageStoreName = storeName; this.dataSize = size; this.fileId = id; + this.raf = raf; + this.fileChannel = fileChannel; this.journalType = content; + this.offset = offset; determineType(); } @@ -99,10 +113,30 @@ private void determineType() { } } + public long getFileId() { + return fileId; + } + + public int getDataSize() { + return dataSize; + } + + public RandomAccessFile getRaf() { + return raf; + } + + public FileChannel getFileChannel() { + return fileChannel; + } + + public long getOffset() { + return offset; + } + @Override public int expectedEncodeSize() { int size = PACKET_HEADERS_SIZE + - DataConstants.SIZE_LONG; // buffer.writeLong(fileId); + DataConstants.SIZE_LONG; // buffer.writeLong(fileId); if (fileId == -1) return size; @@ -125,7 +159,7 @@ public int expectedEncodeSize() { size += DataConstants.SIZE_INT; // buffer.writeInt(dataSize); if (dataSize > 0) { - size += byteBuffer.writerIndex(); // buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex()); + size += dataSize; } return size; @@ -150,30 +184,55 @@ public void encodeRest(final ActiveMQBuffer buffer) { default: // no-op } - buffer.writeInt(dataSize); - /* - * sending -1 will close the file in case of a journal, but not in case of a largeMessage - * (which might receive appends) - */ - if (dataSize > 0) { - buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex()); - } + } - release(); + @Override + public ActiveMQBuffer encode(CoreRemotingConnection connection) { + if (fileId != -1 && dataSize > 0) { + ActiveMQBuffer buffer; + int bufferSize = expectedEncodeSize(); + int encodedSize = bufferSize; + boolean isNetty = false; + if (connection != null && connection.getTransportConnection() instanceof NettyConnection) { + bufferSize -= dataSize; + isNetty = true; + } + buffer = createPacket(connection, bufferSize); + encodeHeader(buffer); + encodeRest(buffer, connection); + if (!isNetty) { + ByteBuffer byteBuffer; + if (buffer.byteBuf() != null && buffer.byteBuf().nioBufferCount() == 1) { + byteBuffer = buffer.byteBuf().internalNioBuffer(buffer.writerIndex(), buffer.writableBytes()); + } else { + byteBuffer = buffer.toByteBuffer(buffer.writerIndex(), buffer.writableBytes()); + } + readFile(byteBuffer); + buffer.writerIndex(buffer.capacity()); + } + encodeSize(buffer, encodedSize); + return buffer; + } else { + return super.encode(connection); + } } @Override public void release() { - if (byteBuffer != null) { - byteBuffer.release(); - byteBuffer = null; + if (raf != null) { + try { + raf.close(); + } catch (IOException e) { + logger.error("Close file " + this + " failed", e); + } } } @Override public void decodeRest(final ActiveMQBuffer buffer) { fileId = buffer.readLong(); + if (fileId == -1) return; switch (FileType.getFileType(buffer.readByte())) { case JOURNAL: { journalType = AbstractJournalStorageManager.JournalContent.getType(buffer.readByte()); @@ -197,6 +256,26 @@ public void decodeRest(final ActiveMQBuffer buffer) { } } + private void readFile(ByteBuffer buffer) { + try { + if (buffer.hasArray()) { + raf.seek(offset); + final int position = buffer.position(); + final int read = raf.read(buffer.array(), buffer.arrayOffset() + position, buffer.remaining()); + if (read >= 0) { + buffer.position(position + read); + } + } else { + if (!buffer.isDirect()) { + logger.debug("Expected direct ByteBuffer while using FileChannel: it can cause leaks unless -Djdk.nio.maxCachedBufferSize is correctly set"); + } + fileChannel.read(buffer, offset); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + public long getId() { return fileId; } @@ -218,61 +297,22 @@ public SimpleString getPageStore() { } @Override - public int hashCode() { - final int prime = 31; - int result = super.hashCode(); - result = prime * result + Arrays.hashCode(byteArray); - result = prime * result + ((byteBuffer == null) ? 0 : byteBuffer.hashCode()); - result = prime * result + dataSize; - result = prime * result + (int) (fileId ^ (fileId >>> 32)); - result = prime * result + ((fileType == null) ? 0 : fileType.hashCode()); - result = prime * result + ((journalType == null) ? 0 : journalType.hashCode()); - result = prime * result + ((pageStoreName == null) ? 0 : pageStoreName.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { + public boolean equals(Object o) { + if (this == o) return true; - } - if (!super.equals(obj)) { - return false; - } - if (!(obj instanceof ReplicationSyncFileMessage)) { - return false; - } - ReplicationSyncFileMessage other = (ReplicationSyncFileMessage) obj; - if (!Arrays.equals(byteArray, other.byteArray)) { - return false; - } - if (byteBuffer == null) { - if (other.byteBuffer != null) { - return false; - } - } else if (!byteBuffer.equals(other.byteBuffer)) { + if (o == null || getClass() != o.getClass()) return false; - } - if (dataSize != other.dataSize) { - return false; - } - if (fileId != other.fileId) { + if (!super.equals(o)) return false; - } - if (fileType != other.fileType) { - return false; - } - if (journalType != other.journalType) { - return false; - } - if (pageStoreName == null) { - if (other.pageStoreName != null) { - return false; - } - } else if (!pageStoreName.equals(other.pageStoreName)) { - return false; - } - return true; + ReplicationSyncFileMessage that = (ReplicationSyncFileMessage) o; + return fileId == that.fileId && dataSize == that.dataSize && offset == that.offset && journalType == that.journalType && Arrays.equals(byteArray, that.byteArray) && Objects.equals(pageStoreName, that.pageStoreName) && fileType == that.fileType && Objects.equals(raf, that.raf) && Objects.equals(fileChannel, that.fileChannel); + } + + @Override + public int hashCode() { + int result = Objects.hash(super.hashCode(), journalType, fileId, dataSize, pageStoreName, fileType, raf, fileChannel, offset); + result = 31 * result + Arrays.hashCode(byteArray); + return result; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java index b2fc576db3c..c54a56857af 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.remoting.impl.invm; +import java.nio.channels.FileChannel; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -242,6 +243,27 @@ public void run() { } + @Override + public void write(FileChannel fileChannel, + long offset, + int dataSize, + final ChannelFutureListener futureListener) { + if (futureListener == null) { + return; + } + try { + executor.execute(() -> { + try { + futureListener.operationComplete(null); + } catch (Exception e) { + throw new IllegalStateException(e); + } + }); + } catch (RejectedExecutionException e) { + + } + } + @Override public String getRemoteAddress() { return "invm:" + serverID; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index 1d1217d6162..ff0e85a2553 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -16,20 +16,20 @@ */ package org.apache.activemq.artemis.core.replication; -import java.io.FileInputStream; -import java.nio.ByteBuffer; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -381,7 +381,7 @@ private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp replicationStream.execute(() -> { if (enabled) { pendingTokens.add(repliToken); - flowControl(packet.expectedEncodeSize()); + flowControl(); replicatingChannel.send(packet); } else { packet.release(); @@ -392,12 +392,60 @@ private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp return repliToken; } + private void sendSyncFileMessage(final ReplicationSyncFileMessage syncFileMessage, boolean lastChunk, CompletableFuture flushed) { + if (!enabled) { + syncFileMessage.release(); + if (flushed != null) { + flushed.completeExceptionally(new IllegalStateException("ReplicationManager wasn't enabled!")); + } + return; + } + + final OperationContext repliToken = OperationContextImpl.getContext(ioExecutorFactory); + repliToken.replicationLineUp(); + + replicationStream.execute(() -> { + if (enabled) { + try { + pendingTokens.add(repliToken); + flowControl(); + Channel.Callback callback = null; + if (flushed != null || lastChunk) { + callback = success -> { + if (lastChunk) { + syncFileMessage.release(); + } + if (flushed != null) { + if (success) { + flushed.complete(null); + } else { + flushed.completeExceptionally(new IOException("The file hasn't been flushed to the network")); + } + } + }; + } + if (syncFileMessage.getFileId() != -1 && syncFileMessage.getDataSize() > 0) { + replicatingChannel.send(syncFileMessage, syncFileMessage.getFileChannel(), + syncFileMessage.getOffset(), syncFileMessage.getDataSize(), callback); + } else { + replicatingChannel.send(syncFileMessage, callback); + } + } catch (Exception e) { + syncFileMessage.release(); + } + } else { + syncFileMessage.release(); + repliToken.replicationDone(); + } + }); + } + /** * This was written as a refactoring of sendReplicatePacket. * In case you refactor this in any way, this method must hold a lock on replication lock. . */ - private boolean flowControl(int size) { - boolean flowWorked = replicatingChannel.getConnection().blockUntilWritable(size, timeout); + private boolean flowControl() { + boolean flowWorked = replicatingChannel.getConnection().blockUntilWritable(timeout); if (!flowWorked) { try { @@ -497,29 +545,23 @@ public int getEncodeSize() { * @throws ActiveMQException * @throws Exception */ - public void syncJournalFile(JournalFile jf, AbstractJournalStorageManager.JournalContent content) throws Exception { - if (!enabled) { - return; - } + public CompletableFuture syncJournalFile(JournalFile jf, AbstractJournalStorageManager.JournalContent content) throws Exception { SequentialFile file = jf.getFile().cloneFile(); try { ActiveMQServerLogger.LOGGER.replicaSyncFile(file, file.size()); - sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE); + return sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE); } finally { if (file.isOpen()) file.close(); } } - public void syncLargeMessageFile(SequentialFile file, long size, long id) throws Exception { - if (enabled) { - sendLargeFile(null, null, id, file, size); - } + public CompletableFuture syncLargeMessageFile(SequentialFile file, long size, long id) throws Exception { + return sendLargeFile(null, null, id, file, size); } - public void syncPages(SequentialFile file, long id, SimpleString queueName) throws Exception { - if (enabled) - sendLargeFile(null, queueName, id, file, Long.MAX_VALUE); + public CompletableFuture syncPages(SequentialFile file, long id, SimpleString queueName) throws Exception { + return sendLargeFile(null, queueName, id, file, Long.MAX_VALUE); } private class FlushAction implements Runnable { @@ -540,6 +582,18 @@ public void run() { } } + private void sendEmptyFile(AbstractJournalStorageManager.JournalContent content, + SimpleString pageStore, + final long id, + String fileName, + CompletableFuture onFlushed) throws Exception { + logger.debugf("sending empty file %s", fileName); + final FlushAction action = new FlushAction(); + sendSyncFileMessage(new ReplicationSyncFileMessage(content, pageStore, id, null, + null, 0, 0), true, onFlushed); + flushReplicationStream(action); + } + /** * Sends large files in reasonably sized chunks to the backup during replication synchronization. * @@ -550,59 +604,78 @@ public void run() { * @param maxBytesToSend maximum number of bytes to read and send from the file * @throws Exception */ - private void sendLargeFile(AbstractJournalStorageManager.JournalContent content, - SimpleString pageStore, - final long id, - SequentialFile file, - long maxBytesToSend) throws Exception { + private CompletableFuture sendLargeFile(AbstractJournalStorageManager.JournalContent content, + SimpleString pageStore, + final long id, + SequentialFile file, + long maxBytesToSend) throws Exception { if (!enabled) - return; + return CompletableFuture.completedFuture(null); if (!file.isOpen()) { file.open(); } - int size = 32 * 1024; - + final int size = 1024 * 1024; + final long fileSize = file.size(); + final File javaFile = file.getJavaFile(); + if (fileSize == 0) { + final String fileName = file.getFileName(); + file.close(); + final CompletableFuture onFlushed = new CompletableFuture<>(); + sendEmptyFile(content, pageStore, id, fileName, onFlushed); + return onFlushed; + } int flowControlSize = 10; int packetsSent = 0; FlushAction action = new FlushAction(); + long offset = 0; + RandomAccessFile raf = null; + FileChannel fileChannel = null; + CompletableFuture onFlushed = null; try { - try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) { - - // We can afford having a single buffer here for this entire loop - // because sendReplicatePacket will encode the packet as a NettyBuffer - // through ActiveMQBuffer class leaving this buffer free to be reused on the next copy - while (true) { - final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); - buffer.clear(); - ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); - final int bytesRead = channel.read(byteBuffer); - int toSend = bytesRead; - if (bytesRead > 0) { - if (bytesRead >= maxBytesToSend) { - toSend = (int) maxBytesToSend; - maxBytesToSend = 0; - } else { - maxBytesToSend = maxBytesToSend - bytesRead; - } - } - logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName()); - // sending -1 or 0 bytes will close the file at the backup - // We cannot simply send everything of a file through the executor, - // otherwise we would run out of memory. - // so we don't use the executor here - sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true); - packetsSent++; - - if (packetsSent % flowControlSize == 0) { - flushReplicationStream(action); + raf = new RandomAccessFile(javaFile, "r"); + fileChannel = raf.getChannel(); + while (true) { + long chunkSize = Math.min(size, fileSize - offset); + int toSend = (int) chunkSize; + if (chunkSize > 0) { + if (chunkSize >= maxBytesToSend) { + toSend = (int) maxBytesToSend; + maxBytesToSend = 0; + } else { + maxBytesToSend = maxBytesToSend - chunkSize; } - if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0) - break; } + logger.debugf("sending %d bytes on file %s", toSend, file.getFileName()); + // sending -1 or 0 bytes will close the file at the backup + // We cannot simply send everything of a file through the executor, + // otherwise we would run out of memory. + // so we don't use the executor here + final boolean lastChunk = offset + toSend == fileSize; + if (lastChunk && (toSend == 0 || maxBytesToSend == 0)) { + onFlushed = new CompletableFuture<>(); + } else { + onFlushed = null; + } + sendSyncFileMessage(new ReplicationSyncFileMessage(content, pageStore, id, raf, fileChannel, offset, toSend), lastChunk, onFlushed); + packetsSent++; + offset += toSend; + + if (packetsSent % flowControlSize == 0) { + flushReplicationStream(action); + } + if (toSend == 0 || maxBytesToSend == 0) + break; } flushReplicationStream(action); + assert onFlushed != null; + return onFlushed; + } catch (Exception e) { + if (raf != null) + raf.close(); + onFlushed.completeExceptionally(e); + throw e; } finally { if (file.isOpen()) file.close(); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java index a0580941bea..b9661a1bfcf 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java @@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; +import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; @@ -54,6 +55,7 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assume.assumeTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -124,8 +126,14 @@ public void testStopReplicationDoesNotDeadlockWhileStopping() throws Exception { manager.loadMessageJournal(postOffice, null, null, null, null, null, null, journalLoader); final ReplicationManager replicationManager = mock(ReplicationManager.class); final PagingManager pagingManager = mock(PagingManager.class); + when(replicationManager.syncJournalFile(any(JournalFile.class), any(AbstractJournalStorageManager.JournalContent.class))) + .thenReturn(CompletableFuture.completedFuture(null)); + when(replicationManager.syncLargeMessageFile(any(SequentialFile.class), anyLong(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(null)); + when(replicationManager.syncPages(any(SequentialFile.class), anyLong(), any(SimpleString.class))) + .thenReturn(CompletableFuture.completedFuture(null)); when(pagingManager.getStoreNames()).thenReturn(new SimpleString[0]); - manager.startReplication(replicationManager, pagingManager, UUID.randomUUID().toString(), false, 0); + manager.startReplication(replicationManager, pagingManager, UUID.randomUUID().toString(), false, Long.MAX_VALUE); final LargeServerMessage largeMessage = manager.createLargeMessage(manager.generateID() + 1, new CoreMessage()); largeMessage.setDurable(true); when(replicationManager.isSynchronizing()).thenReturn(true); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java new file mode 100644 index 00000000000..f01e5e6adbd --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.HashMap; + +import io.netty.channel.embedded.EmbeddedChannel; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl; +import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnection; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.DataConstants; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager.JournalContent.MESSAGES; + +public class ReplicationSyncFileMessageTest extends ActiveMQTestBase { + @Test + public void testNettyConnectionEncodeMessage() throws Exception { + int dataSize = 10; + NettyConnection conn = new NettyConnection(new HashMap<>(), new EmbeddedChannel(), null, false, false); + + SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100); + SequentialFile file = factory.createSequentialFile("file1.bin"); + file.open(); + RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r"); + FileChannel fileChannel = raf.getChannel(); + ReplicationSyncFileMessage replicationSyncFileMessage = new ReplicationSyncFileMessage(MESSAGES, + null, 10, raf, fileChannel, 0, dataSize); + RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null, null); + ActiveMQBuffer buffer = replicationSyncFileMessage.encode(remotingConnection); + Assert.assertEquals(buffer.getInt(0), replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT); + Assert.assertEquals(buffer.capacity(), replicationSyncFileMessage.expectedEncodeSize() - dataSize); + file.close(); + } + + + @Test + public void testInVMConnectionEncodeMessage() throws Exception { + int fileId = 10; + InVMConnection conn = new InVMConnection(0, null, null, null); + + SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100); + SequentialFile file = factory.createSequentialFile("file1.bin"); + file.open(); + RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r"); + FileChannel fileChannel = raf.getChannel(); + ReplicationSyncFileMessage replicationSyncFileMessage = new ReplicationSyncFileMessage(MESSAGES, + null, fileId, raf, fileChannel, 0, 0); + RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null, null); + ActiveMQBuffer buffer = replicationSyncFileMessage.encode(remotingConnection); + Assert.assertEquals(buffer.readInt(), replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT); + Assert.assertEquals(buffer.capacity(), replicationSyncFileMessage.expectedEncodeSize()); + + Assert.assertEquals(buffer.readByte(), PacketImpl.REPLICATION_SYNC_FILE); + + ReplicationSyncFileMessage decodedReplicationSyncFileMessage = new ReplicationSyncFileMessage(); + decodedReplicationSyncFileMessage.decode(buffer); + Assert.assertEquals(decodedReplicationSyncFileMessage.getJournalContent(), MESSAGES); + Assert.assertNull(decodedReplicationSyncFileMessage.getData()); + file.close(); + } +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveCrashOnBackupSyncTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveCrashOnBackupSyncTest.java index c7887002387..6f1c9142cae 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveCrashOnBackupSyncTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveCrashOnBackupSyncTest.java @@ -257,9 +257,11 @@ class DelayPagingStoreImpl extends PagingStoreImpl { } @Override - public void sendPages(ReplicationManager replicator, Collection pageIds) throws Exception { + public void sendPages(ReplicationManager replicator, + Collection pageIds, + long flushFileTimeoutMillis) throws Exception { //in order to extend the synchronization time Thread.sleep(20 * 1000); - super.sendPages(replicator, pageIds); + super.sendPages(replicator, pageIds, flushFileTimeoutMillis); } } \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java index c7ed8699c14..3f0b782d0ea 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.util; +import java.nio.channels.FileChannel; import java.util.concurrent.locks.Lock; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -211,6 +212,22 @@ public boolean send(Packet packet) { return true; } + @Override + public boolean send(Packet packet, Callback callback) { + if (callback != null) { + callback.done(true); + } + return true; + } + + @Override + public boolean send(Packet packet, FileChannel fileChannel, long offset, int dataSize, Callback callback) { + if (callback != null) { + callback.done(true); + } + return true; + } + @Override public boolean sendBatched(Packet packet) { throw new UnsupportedOperationException(); diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java index 30aa4c361e5..16a304c9545 100644 --- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java +++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java @@ -441,7 +441,9 @@ public Collection getCurrentIds() throws Exception { } @Override - public void sendPages(ReplicationManager replicator, Collection pageIds) throws Exception { + public void sendPages(ReplicationManager replicator, + Collection pageIds, + long flushFileTimeoutMillis) throws Exception { } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java index c9c975cf85c..e5369301150 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java @@ -16,7 +16,9 @@ */ package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -29,6 +31,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.server.ActiveMQComponent; @@ -77,6 +82,29 @@ public void testCreateBuffer() throws Exception { } + @Test + public void testWritePacketAndFile() throws Exception { + EmbeddedChannel channel = createChannel(); + NettyConnection conn = new NettyConnection(emptyMap, channel, new MyListener(), false, false); + + final int size = 1234; + + ActiveMQBuffer buff = conn.createTransportBuffer(size); + buff.writeByte((byte) 0x00); // Netty buffer does lazy initialization. + SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100); + SequentialFile file = factory.createSequentialFile("file1.bin"); + file.open(); + RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r"); + FileChannel fileChannel = raf.getChannel(); + + conn.write(buff); + conn.write(fileChannel, 0, size, future -> raf.close()); + channel.runPendingTasks(); + Assert.assertEquals(2, channel.outboundMessages().size()); + Assert.assertFalse(fileChannel.isOpen()); + file.close(); + } + @Test(expected = IllegalStateException.class) public void throwsExceptionOnBlockUntilWritableIfClosed() { EmbeddedChannel channel = createChannel(); @@ -84,7 +112,7 @@ public void throwsExceptionOnBlockUntilWritableIfClosed() { conn.close(); //to make sure the channel is closed it needs to run the pending tasks channel.runPendingTasks(); - conn.blockUntilWritable(0, 0, TimeUnit.NANOSECONDS); + conn.blockUntilWritable(0, TimeUnit.NANOSECONDS); } @Test