From fd4f1d2b6abfb0242d80d64d416feb222d05a3b4 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Fri, 20 Sep 2019 07:39:15 +0200 Subject: [PATCH 1/6] Revert "ARTEMIS-2496 Revert catch up with zero-copy" This reverts commit 70c2200c --- .../artemis/core/protocol/core/Channel.java | 20 +++ .../core/protocol/core/impl/ChannelImpl.java | 118 ++++++++---- .../core/protocol/core/impl/PacketImpl.java | 11 +- .../remoting/impl/netty/NettyConnection.java | 41 +++++ .../netty/NonClosingDefaultFileRegion.java | 38 ++++ .../artemis/spi/core/remoting/Connection.java | 4 + .../protocol/core/impl/ChannelImplTest.java | 11 ++ .../ReplicationSyncFileMessage.java | 170 ++++++++++-------- .../remoting/impl/invm/InVMConnection.java | 24 +++ .../core/replication/ReplicationManager.java | 105 +++++++---- .../ReplicationSyncFileMessageTest.java | 85 +++++++++ .../cluster/util/BackupSyncDelay.java | 7 + .../impl/netty/NettyConnectionTest.java | 28 +++ 13 files changed, 513 insertions(+), 149 deletions(-) create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java create mode 100644 artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java index 56f825959f1..e541dad8a4b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.protocol.core; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; import java.util.concurrent.locks.Lock; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -66,6 +68,20 @@ public interface Channel { */ boolean send(Packet packet); + /** + * Sends a packet and file on this channel. + * + * @param packet the packet to send + * @param raf the file to send + * @param fileChannel the file channel retrieved from raf + * @param offset the position of the raf + * @param dataSize the data size to send + * @param callback callback after send + * @return false if the packet was rejected by an outgoing interceptor; true if the send was + * successful + */ + boolean send(Packet packet, RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, Callback callback); + /** * Sends a packet on this channel. * @@ -247,4 +263,8 @@ public interface Channel { * @param transferring whether the channel is transferring */ void setTransferring(boolean transferring); + + interface Callback { + void done(boolean success); + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 154ab8aa809..d69b1e18357 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; import java.util.EnumSet; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -25,6 +27,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import io.netty.channel.ChannelFutureListener; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; @@ -274,67 +277,104 @@ private void waitForFailOver(String timeoutMsg) { } } - // This must never called by more than one thread concurrently - private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) { - if (invokeInterceptors(packet, interceptors, connection) != null) { - return false; + private ActiveMQBuffer beforeSend(final Packet packet, final int reconnectID) { + packet.setChannelID(id); + + if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { + packet.setCorrelationID(responseAsyncCache.nextCorrelationID()); } - synchronized (sendLock) { - packet.setChannelID(id); + if (logger.isTraceEnabled()) { + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id); + } - if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { - packet.setCorrelationID(responseAsyncCache.nextCorrelationID()); + ActiveMQBuffer buffer = packet.encode(connection); + + lock.lock(); + + try { + if (failingOver) { + waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send"); } - if (logger.isTraceEnabled()) { - logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id); + // Sanity check + if (transferring) { + throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover(); } - ActiveMQBuffer buffer = packet.encode(connection); + if (resendCache != null && packet.isRequiresConfirmations()) { + addResendPacket(packet); + } - lock.lock(); + } finally { + lock.unlock(); + } - try { - if (failingOver) { - waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send"); - } + if (logger.isTraceEnabled()) { + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id); + } - // Sanity check - if (transferring) { - throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover(); - } + checkReconnectID(reconnectID); - if (resendCache != null && packet.isRequiresConfirmations()) { - addResendPacket(packet); + //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in, + //As the send could block if the response cache cannot add, preventing responses to be handled. + if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { + while (!responseAsyncCache.add(packet)) { + try { + Thread.sleep(1); + } catch (Exception e) { + // Ignore } - - } finally { - lock.unlock(); } + } - if (logger.isTraceEnabled()) { - logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id); - } + return buffer; + } - checkReconnectID(reconnectID); + // This must never called by more than one thread concurrently + private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) { + if (invokeInterceptors(packet, interceptors, connection) != null) { + return false; + } - //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in, - //As the send could block if the response cache cannot add, preventing responses to be handled. - if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { - while (!responseAsyncCache.add(packet)) { - try { - Thread.sleep(1); - } catch (Exception e) { - // Ignore - } + synchronized (sendLock) { + ActiveMQBuffer buffer = beforeSend(packet, reconnectID); + + // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp + // buffer is full, preventing any incoming buffers being handled and blocking failover + try { + connection.getTransportConnection().write(buffer, flush, batch); + } catch (Throwable t) { + //If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full. + //The client would get still know about this as the exception bubbles up the call stack instead. + if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { + responseAsyncCache.remove(packet.getCorrelationID()); } + throw t; } + return true; + } + } + + @Override + public boolean send(Packet packet, + RandomAccessFile raf, + FileChannel fileChannel, + long offset, + int dataSize, + Callback callback) { + if (invokeInterceptors(packet, interceptors, connection) != null) { + return false; + } + + synchronized (sendLock) { + ActiveMQBuffer buffer = beforeSend(packet, -1); // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp // buffer is full, preventing any incoming buffers being handled and blocking failover try { - connection.getTransportConnection().write(buffer, flush, batch); + connection.getTransportConnection().write(buffer); + connection.getTransportConnection().write(raf, fileChannel, offset, dataSize, callback == null ? null : (ChannelFutureListener) future -> callback.done(future == null || future.isSuccess())); } catch (Throwable t) { //If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full. //The client would get still know about this as the exception bubbles up the call stack instead. diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index f8f85e8a428..a7a32539ff7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -336,7 +336,11 @@ protected void encodeHeader(ActiveMQBuffer buffer) { } protected void encodeSize(ActiveMQBuffer buffer) { - size = buffer.writerIndex(); + encodeSize(buffer, buffer.writerIndex()); + } + + protected void encodeSize(ActiveMQBuffer buffer, int size) { + this.size = size; // The length doesn't include the actual length byte int len = size - DataConstants.SIZE_INT; @@ -345,9 +349,10 @@ protected void encodeSize(ActiveMQBuffer buffer) { } protected ActiveMQBuffer createPacket(CoreRemotingConnection connection) { + return createPacket(connection, expectedEncodeSize()); + } - int size = expectedEncodeSize(); - + protected ActiveMQBuffer createPacket(CoreRemotingConnection connection, int size) { if (connection == null) { return new ChannelBufferWrapper(Unpooled.buffer(size)); } else { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 51330c727bc..497448e547b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -16,7 +16,10 @@ */ package org.apache.activemq.artemis.core.remoting.impl.netty; +import java.io.IOException; +import java.io.RandomAccessFile; import java.net.SocketAddress; +import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -29,6 +32,8 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedFile; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -350,6 +355,18 @@ private boolean canWrite(final int requiredCapacity) { return canWrite; } + private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize) { + if (channel.pipeline().get(SslHandler.class) == null) { + return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize); + } else { + try { + return new ChunkedFile(raf, offset, dataSize, 8192); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + @Override public final void write(ActiveMQBuffer buffer, final boolean flush, @@ -390,6 +407,30 @@ public final void write(ActiveMQBuffer buffer, } } + @Override + public void write(RandomAccessFile raf, + FileChannel fileChannel, + long offset, + int dataSize, + final ChannelFutureListener futureListener) { + final int readableBytes = dataSize; + if (logger.isDebugEnabled()) { + final int remainingBytes = this.writeBufferHighWaterMark - readableBytes; + if (remainingBytes < 0) { + logger.debug("a write request is exceeding by " + (-remainingBytes) + " bytes the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark + " ] : consider to set it at least of " + readableBytes + " bytes"); + } + } + + //no need to lock because the Netty's channel is thread-safe + //and the order of write is ensured by the order of the write calls + final Channel channel = this.channel; + assert readableBytes >= 0; + ChannelFuture channelFuture = channel.writeAndFlush(getFileObject(raf, fileChannel, offset, dataSize)); + if (futureListener != null) { + channelFuture.addListener(futureListener); + } + } + private static void flushAndWait(final Channel channel, final ChannelPromise promise) { if (!channel.eventLoop().inEventLoop()) { waitFor(promise, DEFAULT_WAIT_MILLIS); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java new file mode 100644 index 00000000000..4fc367fa2b4 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.remoting.impl.netty; + +import java.io.File; +import java.nio.channels.FileChannel; + +import io.netty.channel.DefaultFileRegion; + +public class NonClosingDefaultFileRegion extends DefaultFileRegion { + + public NonClosingDefaultFileRegion(FileChannel file, long position, long count) { + super(file, position, count); + } + + public NonClosingDefaultFileRegion(File f, long position, long count) { + super(f, position, count); + } + + @Override + protected void deallocate() { + // Overridden to avoid closing the file + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java index ebde456034e..fe5d3950f14 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.spi.core.remoting; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; import java.util.concurrent.TimeUnit; import io.netty.channel.ChannelFutureListener; @@ -101,6 +103,8 @@ default boolean blockUntilWritable(final int requiredCapacity, final long timeou */ void write(ActiveMQBuffer buffer); + void write(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, ChannelFutureListener futureListener); + /** * This should close the internal channel without calling any listeners. * This is to avoid a situation where the broker is busy writing on an internal thread. diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java index 7d3fb23939b..4a4ca39512c 100644 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java @@ -17,6 +17,8 @@ package org.apache.activemq.artemis.core.protocol.core.impl; import javax.security.auth.Subject; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; @@ -392,6 +394,15 @@ public void write(ActiveMQBuffer buffer) { } + @Override + public void write(RandomAccessFile raf, + FileChannel fileChannel, + long offset, + int dataSize, + ChannelFutureListener channelFutureListener) { + + } + @Override public void forceClose() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java index b81782bcd05..5a30c64bbe5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java @@ -16,22 +16,30 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.Arrays; import java.util.EnumSet; +import java.util.Objects; import java.util.Set; -import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager; +import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; import org.apache.activemq.artemis.utils.DataConstants; +import org.jboss.logging.Logger; /** * Message is used to sync {@link org.apache.activemq.artemis.core.io.SequentialFile}s to a backup server. The {@link FileType} controls * which extra information is sent. */ public final class ReplicationSyncFileMessage extends PacketImpl { + private static final Logger logger = Logger.getLogger(ReplicationSyncFileMessage.class); /** * The JournalType or {@code null} if sync'ing large-messages. @@ -43,10 +51,12 @@ public final class ReplicationSyncFileMessage extends PacketImpl { */ private long fileId; private int dataSize; - private ByteBuf byteBuffer; private byte[] byteArray; private SimpleString pageStoreName; private FileType fileType; + private RandomAccessFile raf; + private FileChannel fileChannel; + private long offset; public enum FileType { JOURNAL(0), PAGE(1), LARGE_MESSAGE(2); @@ -78,14 +88,18 @@ public ReplicationSyncFileMessage() { public ReplicationSyncFileMessage(AbstractJournalStorageManager.JournalContent content, SimpleString storeName, long id, - int size, - ByteBuf buffer) { + RandomAccessFile raf, + FileChannel fileChannel, + long offset, + int size) { this(); - this.byteBuffer = buffer; this.pageStoreName = storeName; this.dataSize = size; this.fileId = id; + this.raf = raf; + this.fileChannel = fileChannel; this.journalType = content; + this.offset = offset; determineType(); } @@ -99,10 +113,30 @@ private void determineType() { } } + public long getFileId() { + return fileId; + } + + public int getDataSize() { + return dataSize; + } + + public RandomAccessFile getRaf() { + return raf; + } + + public FileChannel getFileChannel() { + return fileChannel; + } + + public long getOffset() { + return offset; + } + @Override public int expectedEncodeSize() { int size = PACKET_HEADERS_SIZE + - DataConstants.SIZE_LONG; // buffer.writeLong(fileId); + DataConstants.SIZE_LONG; // buffer.writeLong(fileId); if (fileId == -1) return size; @@ -125,7 +159,7 @@ public int expectedEncodeSize() { size += DataConstants.SIZE_INT; // buffer.writeInt(dataSize); if (dataSize > 0) { - size += byteBuffer.writerIndex(); // buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex()); + size += dataSize; } return size; @@ -150,30 +184,55 @@ public void encodeRest(final ActiveMQBuffer buffer) { default: // no-op } - buffer.writeInt(dataSize); - /* - * sending -1 will close the file in case of a journal, but not in case of a largeMessage - * (which might receive appends) - */ - if (dataSize > 0) { - buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex()); - } + } - release(); + @Override + public ActiveMQBuffer encode(CoreRemotingConnection connection) { + if (fileId != -1 && dataSize > 0) { + ActiveMQBuffer buffer; + int bufferSize = expectedEncodeSize(); + int encodedSize = bufferSize; + boolean isNetty = false; + if (connection != null && connection.getTransportConnection() instanceof NettyConnection) { + bufferSize -= dataSize; + isNetty = true; + } + buffer = createPacket(connection, bufferSize); + encodeHeader(buffer); + encodeRest(buffer, connection); + if (!isNetty) { + ByteBuffer byteBuffer; + if (buffer.byteBuf() != null && buffer.byteBuf().nioBufferCount() == 1) { + byteBuffer = buffer.byteBuf().internalNioBuffer(buffer.writerIndex(), buffer.writableBytes()); + } else { + byteBuffer = buffer.toByteBuffer(buffer.writerIndex(), buffer.writableBytes()); + } + readFile(byteBuffer); + buffer.writerIndex(buffer.capacity()); + } + encodeSize(buffer, encodedSize); + return buffer; + } else { + return super.encode(connection); + } } @Override public void release() { - if (byteBuffer != null) { - byteBuffer.release(); - byteBuffer = null; + if (raf != null) { + try { + raf.close(); + } catch (IOException e) { + logger.error("Close file " + this + " failed", e); + } } } @Override public void decodeRest(final ActiveMQBuffer buffer) { fileId = buffer.readLong(); + if (fileId == -1) return; switch (FileType.getFileType(buffer.readByte())) { case JOURNAL: { journalType = AbstractJournalStorageManager.JournalContent.getType(buffer.readByte()); @@ -197,6 +256,14 @@ public void decodeRest(final ActiveMQBuffer buffer) { } } + private void readFile(ByteBuffer buffer) { + try { + fileChannel.read(buffer, offset); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + public long getId() { return fileId; } @@ -218,61 +285,22 @@ public SimpleString getPageStore() { } @Override - public int hashCode() { - final int prime = 31; - int result = super.hashCode(); - result = prime * result + Arrays.hashCode(byteArray); - result = prime * result + ((byteBuffer == null) ? 0 : byteBuffer.hashCode()); - result = prime * result + dataSize; - result = prime * result + (int) (fileId ^ (fileId >>> 32)); - result = prime * result + ((fileType == null) ? 0 : fileType.hashCode()); - result = prime * result + ((journalType == null) ? 0 : journalType.hashCode()); - result = prime * result + ((pageStoreName == null) ? 0 : pageStoreName.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { + public boolean equals(Object o) { + if (this == o) return true; - } - if (!super.equals(obj)) { - return false; - } - if (!(obj instanceof ReplicationSyncFileMessage)) { - return false; - } - ReplicationSyncFileMessage other = (ReplicationSyncFileMessage) obj; - if (!Arrays.equals(byteArray, other.byteArray)) { - return false; - } - if (byteBuffer == null) { - if (other.byteBuffer != null) { - return false; - } - } else if (!byteBuffer.equals(other.byteBuffer)) { + if (o == null || getClass() != o.getClass()) return false; - } - if (dataSize != other.dataSize) { - return false; - } - if (fileId != other.fileId) { + if (!super.equals(o)) return false; - } - if (fileType != other.fileType) { - return false; - } - if (journalType != other.journalType) { - return false; - } - if (pageStoreName == null) { - if (other.pageStoreName != null) { - return false; - } - } else if (!pageStoreName.equals(other.pageStoreName)) { - return false; - } - return true; + ReplicationSyncFileMessage that = (ReplicationSyncFileMessage) o; + return fileId == that.fileId && dataSize == that.dataSize && offset == that.offset && journalType == that.journalType && Arrays.equals(byteArray, that.byteArray) && Objects.equals(pageStoreName, that.pageStoreName) && fileType == that.fileType && Objects.equals(raf, that.raf) && Objects.equals(fileChannel, that.fileChannel); + } + + @Override + public int hashCode() { + int result = Objects.hash(super.hashCode(), journalType, fileId, dataSize, pageStoreName, fileType, raf, fileChannel, offset); + result = 31 * result + Arrays.hashCode(byteArray); + return result; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java index b2fc576db3c..02f1c84acf1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.remoting.impl.invm; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -242,6 +244,28 @@ public void run() { } + @Override + public void write(RandomAccessFile raf, + FileChannel fileChannel, + long offset, + int dataSize, + final ChannelFutureListener futureListener) { + if (futureListener == null) { + return; + } + try { + executor.execute(() -> { + try { + futureListener.operationComplete(null); + } catch (Exception e) { + throw new IllegalStateException(e); + } + }); + } catch (RejectedExecutionException e) { + + } + } + @Override public String getRemoteAddress() { return "invm:" + serverID; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index 1d1217d6162..d48a5a01c8d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -16,8 +16,7 @@ */ package org.apache.activemq.artemis.core.replication; -import java.io.FileInputStream; -import java.nio.ByteBuffer; +import java.io.RandomAccessFile; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.LinkedHashSet; @@ -28,8 +27,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -392,6 +389,39 @@ private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp return repliToken; } + private OperationContext sendSyncFileMessage(final ReplicationSyncFileMessage syncFileMessage, boolean lastChunk) { + if (!enabled) { + syncFileMessage.release(); + return null; + } + + final OperationContext repliToken = OperationContextImpl.getContext(ioExecutorFactory); + repliToken.replicationLineUp(); + + replicationStream.execute(() -> { + if (enabled) { + try { + pendingTokens.add(repliToken); + flowControl(syncFileMessage.expectedEncodeSize()); + if (syncFileMessage.getFileId() != -1 && syncFileMessage.getDataSize() > 0) { + replicatingChannel.send(syncFileMessage, syncFileMessage.getRaf(), syncFileMessage.getFileChannel(), + syncFileMessage.getOffset(), syncFileMessage.getDataSize(), + lastChunk ? (Channel.Callback) success -> syncFileMessage.release() : null); + } else { + replicatingChannel.send(syncFileMessage); + } + } catch (Exception e) { + syncFileMessage.release(); + } + } else { + syncFileMessage.release(); + repliToken.replicationDone(); + } + }); + + return repliToken; + } + /** * This was written as a refactoring of sendReplicatePacket. * In case you refactor this in any way, this method must hold a lock on replication lock. . @@ -560,49 +590,52 @@ private void sendLargeFile(AbstractJournalStorageManager.JournalContent content, if (!file.isOpen()) { file.open(); } - int size = 32 * 1024; + final int size = 1024 * 1024; + long fileSize = file.size(); int flowControlSize = 10; int packetsSent = 0; FlushAction action = new FlushAction(); + long offset = 0; + RandomAccessFile raf = null; + FileChannel fileChannel = null; try { - try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) { - - // We can afford having a single buffer here for this entire loop - // because sendReplicatePacket will encode the packet as a NettyBuffer - // through ActiveMQBuffer class leaving this buffer free to be reused on the next copy - while (true) { - final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); - buffer.clear(); - ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); - final int bytesRead = channel.read(byteBuffer); - int toSend = bytesRead; - if (bytesRead > 0) { - if (bytesRead >= maxBytesToSend) { - toSend = (int) maxBytesToSend; - maxBytesToSend = 0; - } else { - maxBytesToSend = maxBytesToSend - bytesRead; - } - } - logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName()); - // sending -1 or 0 bytes will close the file at the backup - // We cannot simply send everything of a file through the executor, - // otherwise we would run out of memory. - // so we don't use the executor here - sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true); - packetsSent++; - - if (packetsSent % flowControlSize == 0) { - flushReplicationStream(action); + raf = new RandomAccessFile(file.getJavaFile(), "r"); + fileChannel = raf.getChannel(); + while (true) { + long chunkSize = Math.min(size, fileSize - offset); + int toSend = (int) chunkSize; + if (chunkSize > 0) { + if (chunkSize >= maxBytesToSend) { + toSend = (int) maxBytesToSend; + maxBytesToSend = 0; + } else { + maxBytesToSend = maxBytesToSend - chunkSize; } - if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0) - break; } + logger.debug("sending " + toSend + " bytes on file " + file.getFileName()); + // sending -1 or 0 bytes will close the file at the backup + // We cannot simply send everything of a file through the executor, + // otherwise we would run out of memory. + // so we don't use the executor here + sendSyncFileMessage(new ReplicationSyncFileMessage(content, pageStore, id, raf, fileChannel, offset, toSend), offset + toSend == fileSize); + packetsSent++; + offset += toSend; + + if (packetsSent % flowControlSize == 0) { + flushReplicationStream(action); + } + if (toSend == 0 || maxBytesToSend == 0) + break; } flushReplicationStream(action); + + } catch (Exception e) { + if (raf != null) + raf.close(); + throw e; } finally { if (file.isOpen()) file.close(); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java new file mode 100644 index 00000000000..f01e5e6adbd --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.HashMap; + +import io.netty.channel.embedded.EmbeddedChannel; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl; +import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnection; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.DataConstants; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager.JournalContent.MESSAGES; + +public class ReplicationSyncFileMessageTest extends ActiveMQTestBase { + @Test + public void testNettyConnectionEncodeMessage() throws Exception { + int dataSize = 10; + NettyConnection conn = new NettyConnection(new HashMap<>(), new EmbeddedChannel(), null, false, false); + + SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100); + SequentialFile file = factory.createSequentialFile("file1.bin"); + file.open(); + RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r"); + FileChannel fileChannel = raf.getChannel(); + ReplicationSyncFileMessage replicationSyncFileMessage = new ReplicationSyncFileMessage(MESSAGES, + null, 10, raf, fileChannel, 0, dataSize); + RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null, null); + ActiveMQBuffer buffer = replicationSyncFileMessage.encode(remotingConnection); + Assert.assertEquals(buffer.getInt(0), replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT); + Assert.assertEquals(buffer.capacity(), replicationSyncFileMessage.expectedEncodeSize() - dataSize); + file.close(); + } + + + @Test + public void testInVMConnectionEncodeMessage() throws Exception { + int fileId = 10; + InVMConnection conn = new InVMConnection(0, null, null, null); + + SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100); + SequentialFile file = factory.createSequentialFile("file1.bin"); + file.open(); + RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r"); + FileChannel fileChannel = raf.getChannel(); + ReplicationSyncFileMessage replicationSyncFileMessage = new ReplicationSyncFileMessage(MESSAGES, + null, fileId, raf, fileChannel, 0, 0); + RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null, null); + ActiveMQBuffer buffer = replicationSyncFileMessage.encode(remotingConnection); + Assert.assertEquals(buffer.readInt(), replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT); + Assert.assertEquals(buffer.capacity(), replicationSyncFileMessage.expectedEncodeSize()); + + Assert.assertEquals(buffer.readByte(), PacketImpl.REPLICATION_SYNC_FILE); + + ReplicationSyncFileMessage decodedReplicationSyncFileMessage = new ReplicationSyncFileMessage(); + decodedReplicationSyncFileMessage.decode(buffer); + Assert.assertEquals(decodedReplicationSyncFileMessage.getJournalContent(), MESSAGES); + Assert.assertNull(decodedReplicationSyncFileMessage.getData()); + file.close(); + } +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java index c7ed8699c14..c55764a0962 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.util; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; import java.util.concurrent.locks.Lock; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -211,6 +213,11 @@ public boolean send(Packet packet) { return true; } + @Override + public boolean send(Packet packet, RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, Callback callback) { + return true; + } + @Override public boolean sendBatched(Packet packet) { throw new UnsupportedOperationException(); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java index c9c975cf85c..23ae5f9a7c9 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java @@ -16,7 +16,9 @@ */ package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -29,6 +31,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.server.ActiveMQComponent; @@ -77,6 +82,29 @@ public void testCreateBuffer() throws Exception { } + @Test + public void testWritePacketAndFile() throws Exception { + EmbeddedChannel channel = createChannel(); + NettyConnection conn = new NettyConnection(emptyMap, channel, new MyListener(), false, false); + + final int size = 1234; + + ActiveMQBuffer buff = conn.createTransportBuffer(size); + buff.writeByte((byte) 0x00); // Netty buffer does lazy initialization. + SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100); + SequentialFile file = factory.createSequentialFile("file1.bin"); + file.open(); + RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r"); + FileChannel fileChannel = raf.getChannel(); + + conn.write(buff); + conn.write(raf, fileChannel, 0, size, future -> raf.close()); + channel.runPendingTasks(); + Assert.assertEquals(2, channel.outboundMessages().size()); + Assert.assertFalse(fileChannel.isOpen()); + file.close(); + } + @Test(expected = IllegalStateException.class) public void throwsExceptionOnBlockUntilWritableIfClosed() { EmbeddedChannel channel = createChannel(); From 3d8d2bbcb394d4aabcb5bdffcef9c400ddc2475f Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Fri, 20 Sep 2019 09:14:54 +0200 Subject: [PATCH 2/6] ARTEMIS-2336 allow to switch between transfer file modes --- .../artemis/core/protocol/core/Channel.java | 4 +- .../core/protocol/core/impl/ChannelImpl.java | 10 +- .../impl/netty/AbsoluteChunkedNioFile.java | 190 ++++++++++++++++++ .../impl/netty/ActiveMQChannelHandler.java | 6 +- .../remoting/impl/netty/NettyConnection.java | 62 ++++-- .../netty/NonClosingDefaultFileRegion.java | 38 ---- .../artemis/spi/core/remoting/Connection.java | 3 +- .../protocol/core/impl/ChannelImplTest.java | 4 +- .../ReplicationSyncFileMessage.java | 14 +- .../remoting/impl/invm/InVMConnection.java | 4 +- .../core/replication/ReplicationManager.java | 2 +- .../cluster/util/BackupSyncDelay.java | 3 +- .../impl/netty/NettyConnectionTest.java | 2 +- 13 files changed, 262 insertions(+), 80 deletions(-) create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/AbsoluteChunkedNioFile.java delete mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java index e541dad8a4b..4c3b1a943f6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.core.protocol.core; -import java.io.RandomAccessFile; import java.nio.channels.FileChannel; import java.util.concurrent.locks.Lock; @@ -72,7 +71,6 @@ public interface Channel { * Sends a packet and file on this channel. * * @param packet the packet to send - * @param raf the file to send * @param fileChannel the file channel retrieved from raf * @param offset the position of the raf * @param dataSize the data size to send @@ -80,7 +78,7 @@ public interface Channel { * @return false if the packet was rejected by an outgoing interceptor; true if the send was * successful */ - boolean send(Packet packet, RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, Callback callback); + boolean send(Packet packet, FileChannel fileChannel, long offset, int dataSize, Callback callback); /** * Sends a packet on this channel. diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index d69b1e18357..352a5bcb072 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl; -import java.io.RandomAccessFile; import java.nio.channels.FileChannel; import java.util.EnumSet; import java.util.List; @@ -42,6 +41,7 @@ import org.apache.activemq.artemis.core.protocol.core.ResponseHandler; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.ConcurrentUtil; import org.jboss.logging.Logger; @@ -151,6 +151,11 @@ public ChannelImpl(final CoreRemotingConnection connection, } this.interceptors = interceptors; + //zero copy transfer is initialized only for replication channels + if (id == CHANNEL_ID.REPLICATION.id && connection.getTransportConnection() instanceof NettyConnection) { + final NettyConnection nettyConnection = (NettyConnection) connection.getTransportConnection(); + nettyConnection.initializeZeroCopyTransfer(); + } } @Override @@ -358,7 +363,6 @@ private boolean send(final Packet packet, final int reconnectID, final boolean f @Override public boolean send(Packet packet, - RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, @@ -374,7 +378,7 @@ public boolean send(Packet packet, // buffer is full, preventing any incoming buffers being handled and blocking failover try { connection.getTransportConnection().write(buffer); - connection.getTransportConnection().write(raf, fileChannel, offset, dataSize, callback == null ? null : (ChannelFutureListener) future -> callback.done(future == null || future.isSuccess())); + connection.getTransportConnection().write(fileChannel, offset, dataSize, callback == null ? null : (ChannelFutureListener) future -> callback.done(future == null || future.isSuccess())); } catch (Throwable t) { //If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full. //The client would get still know about this as the exception bubbles up the call stack instead. diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/AbsoluteChunkedNioFile.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/AbsoluteChunkedNioFile.java new file mode 100644 index 00000000000..5678f32df4b --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/AbsoluteChunkedNioFile.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.remoting.impl.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.FileRegion; +import io.netty.handler.stream.ChunkedInput; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.FileChannel; + +/** + * A {@link ChunkedInput} that fetches data from a file chunk by chunk using + * NIO {@link FileChannel}. + *

+ * If your operating system supports + * zero-copy file transfer + * such as {@code sendfile()}, you might want to use {@link FileRegion} instead. + */ +class AbsoluteChunkedNioFile implements ChunkedInput { + + private final FileChannel in; + private final long startOffset; + private final long endOffset; + private final int chunkSize; + private long offset; + + /** + * Creates a new instance that fetches data from the specified file. + */ + AbsoluteChunkedNioFile(File in) throws IOException { + this(new RandomAccessFile(in, "r").getChannel()); + } + + /** + * Creates a new instance that fetches data from the specified file. + * + * @param chunkSize the number of bytes to fetch on each + * {@link #readChunk(ChannelHandlerContext)} call + */ + AbsoluteChunkedNioFile(File in, int chunkSize) throws IOException { + this(new RandomAccessFile(in, "r").getChannel(), chunkSize); + } + + /** + * Creates a new instance that fetches data from the specified file. + */ + AbsoluteChunkedNioFile(FileChannel in) throws IOException { + this(in, 8192); + } + + /** + * Creates a new instance that fetches data from the specified file. + * + * @param chunkSize the number of bytes to fetch on each + * {@link #readChunk(ChannelHandlerContext)} call + */ + AbsoluteChunkedNioFile(FileChannel in, int chunkSize) throws IOException { + this(in, 0, in.size(), chunkSize); + } + + /** + * Creates a new instance that fetches data from the specified file. + * + * @param offset the offset of the file where the transfer begins + * @param length the number of bytes to transfer + * @param chunkSize the number of bytes to fetch on each + * {@link #readChunk(ChannelHandlerContext)} call + */ + AbsoluteChunkedNioFile(FileChannel in, long offset, long length, int chunkSize) throws IOException { + if (in == null) { + throw new NullPointerException("in"); + } + if (offset < 0) { + throw new IllegalArgumentException("offset: " + offset + " (expected: 0 or greater)"); + } + if (length < 0) { + throw new IllegalArgumentException("length: " + length + " (expected: 0 or greater)"); + } + if (chunkSize <= 0) { + throw new IllegalArgumentException("chunkSize: " + chunkSize + " (expected: a positive integer)"); + } + if (!in.isOpen()) { + throw new ClosedChannelException(); + } + this.in = in; + this.chunkSize = chunkSize; + this.offset = startOffset = offset; + endOffset = offset + length; + } + + /** + * Returns the offset in the file where the transfer began. + */ + public long startOffset() { + return startOffset; + } + + /** + * Returns the offset in the file where the transfer will end. + */ + public long endOffset() { + return endOffset; + } + + /** + * Returns the offset in the file where the transfer is happening currently. + */ + public long currentOffset() { + return offset; + } + + @Override + public boolean isEndOfInput() throws Exception { + return !(offset < endOffset && in.isOpen()); + } + + @Override + public void close() throws Exception { + in.close(); + } + + @Deprecated + @Override + public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { + return readChunk(ctx.alloc()); + } + + @Override + public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception { + long offset = this.offset; + if (offset >= endOffset) { + return null; + } + + int chunkSize = (int) Math.min(this.chunkSize, endOffset - offset); + ByteBuf buffer = allocator.buffer(chunkSize); + boolean release = true; + try { + int readBytes = 0; + for (;; ) { + int localReadBytes = buffer.writeBytes(in, offset + readBytes, chunkSize - readBytes); + if (localReadBytes < 0) { + break; + } + readBytes += localReadBytes; + if (readBytes == chunkSize) { + break; + } + } + this.offset += readBytes; + release = false; + return buffer; + } finally { + if (release) { + buffer.release(); + } + } + } + + @Override + public long length() { + return endOffset - startOffset; + } + + @Override + public long progress() { + return offset - startOffset; + } +} + diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java index de8b49ef560..681eebb44ef 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java @@ -62,7 +62,11 @@ public void channelActive(final ChannelHandlerContext ctx) throws Exception { @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - listener.connectionReadyForWrites(channelId(ctx.channel()), ctx.channel().isWritable()); + try { + listener.connectionReadyForWrites(channelId(ctx.channel()), ctx.channel().isWritable()); + } finally { + ctx.fireChannelWritabilityChanged(); + } } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 497448e547b..8653469c2e6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -17,7 +17,6 @@ package org.apache.activemq.artemis.core.remoting.impl.netty; import java.io.IOException; -import java.io.RandomAccessFile; import java.net.SocketAddress; import java.nio.channels.FileChannel; import java.util.ArrayList; @@ -30,10 +29,13 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultFileRegion; import io.netty.channel.EventLoop; import io.netty.handler.ssl.SslHandler; -import io.netty.handler.stream.ChunkedFile; +import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.util.internal.SystemPropertyUtil; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -51,8 +53,10 @@ public class NettyConnection implements Connection { private static final Logger logger = Logger.getLogger(NettyConnection.class); - + public static final String USE_FILE_REGION_PROP_NAME = "io.netty.file.region"; + private final boolean USE_FILE_REGION = SystemPropertyUtil.getBoolean(USE_FILE_REGION_PROP_NAME, true); private static final int DEFAULT_BATCH_BYTES = Integer.getInteger("io.netty.batch.bytes", 8192); + private static final int CHUNKED_NIO_BYTES = 32 * 1024; private static final int DEFAULT_WAIT_MILLIS = 10_000; protected final Channel channel; @@ -95,6 +99,20 @@ public NettyConnection(final Map configuration, this.batchLimit = batchingEnabled ? Math.min(this.writeBufferHighWaterMark, DEFAULT_BATCH_BYTES) : 0; } + /** + * It prepares the {@link #getNettyChannel()}'s pipeline to be able to handle + * {@link io.netty.handler.stream.ChunkedInput} objects, if {@link DefaultFileRegion} are not supported. + * This method is not thread-safe and should be called only once on a connection lifecycle. + */ + public void initializeZeroCopyTransfer() { + final ChannelPipeline pipeline = channel.pipeline(); + if (!USE_FILE_REGION || pipeline.get(SslHandler.class) != null) { + if (pipeline.get(ChunkedWriteHandler.class) == null) { + pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); + } + } + } + private static void waitFor(ChannelPromise promise, long millis) { try { final boolean completed = promise.await(millis); @@ -355,12 +373,24 @@ private boolean canWrite(final int requiredCapacity) { return canWrite; } - private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize) { - if (channel.pipeline().get(SslHandler.class) == null) { - return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize); + private Object getFileObject(FileChannel fileChannel, long offset, int dataSize) { + if (USE_FILE_REGION && channel.pipeline().get(SslHandler.class) == null) { + return new DefaultFileRegion(fileChannel, offset, dataSize) { + @Override + protected void deallocate() { + //no op + } + }; } else { + assert channel.pipeline().get(ChunkedWriteHandler.class) != null : + "ChunkedWriteHandler needs to be added to the pipeline to handle ChunkedNioFile"; try { - return new ChunkedFile(raf, offset, dataSize, 8192); + return new AbsoluteChunkedNioFile(fileChannel, offset, dataSize, CHUNKED_NIO_BYTES) { + @Override + public void close() throws Exception { + //no op + } + }; } catch (IOException e) { throw new RuntimeException(e); } @@ -408,24 +438,12 @@ public final void write(ActiveMQBuffer buffer, } @Override - public void write(RandomAccessFile raf, - FileChannel fileChannel, + public void write(FileChannel fileChannel, long offset, int dataSize, final ChannelFutureListener futureListener) { - final int readableBytes = dataSize; - if (logger.isDebugEnabled()) { - final int remainingBytes = this.writeBufferHighWaterMark - readableBytes; - if (remainingBytes < 0) { - logger.debug("a write request is exceeding by " + (-remainingBytes) + " bytes the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark + " ] : consider to set it at least of " + readableBytes + " bytes"); - } - } - - //no need to lock because the Netty's channel is thread-safe - //and the order of write is ensured by the order of the write calls - final Channel channel = this.channel; - assert readableBytes >= 0; - ChannelFuture channelFuture = channel.writeAndFlush(getFileObject(raf, fileChannel, offset, dataSize)); + final ChannelPromise promise = futureListener != null ? channel.newPromise() : channel.voidPromise(); + ChannelFuture channelFuture = channel.writeAndFlush(getFileObject(fileChannel, offset, dataSize), promise); if (futureListener != null) { channelFuture.addListener(futureListener); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java deleted file mode 100644 index 4fc367fa2b4..00000000000 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.remoting.impl.netty; - -import java.io.File; -import java.nio.channels.FileChannel; - -import io.netty.channel.DefaultFileRegion; - -public class NonClosingDefaultFileRegion extends DefaultFileRegion { - - public NonClosingDefaultFileRegion(FileChannel file, long position, long count) { - super(file, position, count); - } - - public NonClosingDefaultFileRegion(File f, long position, long count) { - super(f, position, count); - } - - @Override - protected void deallocate() { - // Overridden to avoid closing the file - } -} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java index fe5d3950f14..68261bafe8d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.spi.core.remoting; -import java.io.RandomAccessFile; import java.nio.channels.FileChannel; import java.util.concurrent.TimeUnit; @@ -103,7 +102,7 @@ default boolean blockUntilWritable(final int requiredCapacity, final long timeou */ void write(ActiveMQBuffer buffer); - void write(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, ChannelFutureListener futureListener); + void write(FileChannel fileChannel, long offset, int dataSize, ChannelFutureListener futureListener); /** * This should close the internal channel without calling any listeners. diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java index 4a4ca39512c..a318a0fb8c5 100644 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java @@ -17,7 +17,6 @@ package org.apache.activemq.artemis.core.protocol.core.impl; import javax.security.auth.Subject; -import java.io.RandomAccessFile; import java.nio.channels.FileChannel; import java.util.List; import java.util.concurrent.Future; @@ -395,8 +394,7 @@ public void write(ActiveMQBuffer buffer) { } @Override - public void write(RandomAccessFile raf, - FileChannel fileChannel, + public void write(FileChannel fileChannel, long offset, int dataSize, ChannelFutureListener channelFutureListener) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java index 5a30c64bbe5..84ad5c72221 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java @@ -258,7 +258,19 @@ public void decodeRest(final ActiveMQBuffer buffer) { private void readFile(ByteBuffer buffer) { try { - fileChannel.read(buffer, offset); + if (buffer.hasArray()) { + raf.seek(offset); + final int position = buffer.position(); + final int read = raf.read(buffer.array(), buffer.arrayOffset() + position, buffer.remaining()); + if (read >= 0) { + buffer.position(position + read); + } + } else { + if (!buffer.isDirect()) { + logger.debug("Expected direct ByteBuffer while using FileChannel: it can cause leaks unless -Djdk.nio.maxCachedBufferSize is correctly set"); + } + fileChannel.read(buffer, offset); + } } catch (IOException e) { throw new RuntimeException(e); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java index 02f1c84acf1..c54a56857af 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.core.remoting.impl.invm; -import java.io.RandomAccessFile; import java.nio.channels.FileChannel; import java.util.HashMap; import java.util.Map; @@ -245,8 +244,7 @@ public void run() { } @Override - public void write(RandomAccessFile raf, - FileChannel fileChannel, + public void write(FileChannel fileChannel, long offset, int dataSize, final ChannelFutureListener futureListener) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index d48a5a01c8d..9da83a47f32 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -404,7 +404,7 @@ private OperationContext sendSyncFileMessage(final ReplicationSyncFileMessage sy pendingTokens.add(repliToken); flowControl(syncFileMessage.expectedEncodeSize()); if (syncFileMessage.getFileId() != -1 && syncFileMessage.getDataSize() > 0) { - replicatingChannel.send(syncFileMessage, syncFileMessage.getRaf(), syncFileMessage.getFileChannel(), + replicatingChannel.send(syncFileMessage, syncFileMessage.getFileChannel(), syncFileMessage.getOffset(), syncFileMessage.getDataSize(), lastChunk ? (Channel.Callback) success -> syncFileMessage.release() : null); } else { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java index c55764a0962..cb91f7c0e4b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.util; -import java.io.RandomAccessFile; import java.nio.channels.FileChannel; import java.util.concurrent.locks.Lock; @@ -214,7 +213,7 @@ public boolean send(Packet packet) { } @Override - public boolean send(Packet packet, RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, Callback callback) { + public boolean send(Packet packet, FileChannel fileChannel, long offset, int dataSize, Callback callback) { return true; } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java index 23ae5f9a7c9..26904932636 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java @@ -98,7 +98,7 @@ public void testWritePacketAndFile() throws Exception { FileChannel fileChannel = raf.getChannel(); conn.write(buff); - conn.write(raf, fileChannel, 0, size, future -> raf.close()); + conn.write(fileChannel, 0, size, future -> raf.close()); channel.runPendingTasks(); Assert.assertEquals(2, channel.outboundMessages().size()); Assert.assertFalse(fileChannel.isOpen()); From 5f08dfd6bb48bce971da27038dc0493fbb9b15a4 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Wed, 25 Sep 2019 15:10:14 +0200 Subject: [PATCH 3/6] ARTEMIS-2336 Fast-path to send empty files --- .../core/replication/ReplicationManager.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index 9da83a47f32..fe364c06e38 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -570,6 +570,16 @@ public void run() { } } + private void sendEmptyFile(AbstractJournalStorageManager.JournalContent content, + SimpleString pageStore, + final long id, + String fileName) throws Exception { + logger.debugf("sending empty file %s", fileName); + final FlushAction action = new FlushAction(); + sendSyncFileMessage(new ReplicationSyncFileMessage(content, pageStore, id, null, null, 0, 0), true); + flushReplicationStream(action); + } + /** * Sends large files in reasonably sized chunks to the backup during replication synchronization. * @@ -591,8 +601,13 @@ private void sendLargeFile(AbstractJournalStorageManager.JournalContent content, file.open(); } final int size = 1024 * 1024; - long fileSize = file.size(); - + final long fileSize = file.size(); + if (fileSize == 0) { + final String fileName = file.getFileName(); + file.close(); + sendEmptyFile(content, pageStore, id, fileName); + return; + } int flowControlSize = 10; int packetsSent = 0; From e434f186fb27e21f7830a96018a97648a6ee8098 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Wed, 25 Sep 2019 15:46:31 +0200 Subject: [PATCH 4/6] ARTEMIS-2336 Simplify flow control from Netty --- .../protocol/core/CoreRemotingConnection.java | 4 +--- .../core/impl/ActiveMQSessionContext.java | 5 ++-- .../core/impl/RemotingConnectionImpl.java | 4 ++-- .../remoting/impl/netty/NettyConnection.java | 23 +++---------------- .../artemis/spi/core/remoting/Connection.java | 8 +++---- .../protocol/core/impl/ChannelImplTest.java | 2 +- .../core/replication/ReplicationManager.java | 8 +++---- .../impl/netty/NettyConnectionTest.java | 2 +- 8 files changed, 18 insertions(+), 38 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java index 377b1b59016..cf35216d3b0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java @@ -134,11 +134,9 @@ default boolean isVersionNewFQQN() { ActiveMQPrincipal getDefaultActiveMQPrincipal(); /** - * - * @param size size we are trying to write * @param timeout * @return * @throws IllegalStateException if the connection is closed */ - boolean blockUntilWritable(int size, long timeout); + boolean blockUntilWritable(long timeout); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 26c405c0134..f9bbd66d40a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -1017,13 +1017,14 @@ private int sendSessionSendContinuationMessage(Channel channel, final long blockingCallTimeoutMillis = Math.max(0, connection.getBlockingCallTimeout()); final long startFlowControl = System.nanoTime(); try { - final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis); + final boolean isWritable = connection.blockUntilWritable(blockingCallTimeoutMillis); if (!isWritable) { final long endFlowControl = System.nanoTime(); final long elapsedFlowControl = endFlowControl - startFlowControl; final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl); ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage(); - logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis + " ms on a not writable connection: [" + connection.getID() + "]"); + logger.debugf("try to write %d bytes after blocked %d ms on a not writable connection: [%s]", + expectedEncodeSize, elapsedMillis, connection.getID()); } if (requiresResponse) { // When sending it blocking, only the last chunk will be blocking. diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java index 418e3f150b9..cb64d3ddccf 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java @@ -244,8 +244,8 @@ public void destroy() { } @Override - public boolean blockUntilWritable(int size, long timeout) { - return transportConnection.blockUntilWritable(size, timeout, TimeUnit.MILLISECONDS); + public boolean blockUntilWritable(long timeout) { + return transportConnection.blockUntilWritable(timeout, TimeUnit.MILLISECONDS); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 8653469c2e6..3e9806c756a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -140,10 +140,6 @@ private static int batchBufferSize(Channel channel, int writeBufferHighWaterMark return writtenBytes; } - public final int pendingWritesOnChannel() { - return batchBufferSize(this.channel, this.writeBufferHighWaterMark); - } - public final Channel getNettyChannel() { return channel; } @@ -316,7 +312,7 @@ private void checkConnectionState() { } @Override - public final boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) { + public final boolean blockUntilWritable(final long timeout, final TimeUnit timeUnit) { checkConnectionState(); final boolean isAllowedToBlock = isAllowedToBlock(); if (!isAllowedToBlock) { @@ -331,7 +327,7 @@ public final boolean blockUntilWritable(final int requiredCapacity, final long t if (logger.isDebugEnabled()) { logger.debug("Calling blockUntilWritable using a thread where it's not allowed"); } - return canWrite(requiredCapacity); + return channel.isWritable(); } else { final long timeoutNanos = timeUnit.toNanos(timeout); final long deadline = System.nanoTime() + timeoutNanos; @@ -345,7 +341,7 @@ public final boolean blockUntilWritable(final int requiredCapacity, final long t parkNanos = 1000L; } boolean canWrite; - while (!(canWrite = canWrite(requiredCapacity)) && (System.nanoTime() - deadline) < 0) { + while (!(canWrite = channel.isWritable()) && (System.nanoTime() - deadline) < 0) { //periodically check the connection state checkConnectionState(); LockSupport.parkNanos(parkNanos); @@ -360,19 +356,6 @@ private boolean isAllowedToBlock() { return !inEventLoop; } - private boolean canWrite(final int requiredCapacity) { - //evaluate if the write request could be taken: - //there is enough space in the write buffer? - final long totalPendingWrites = this.pendingWritesOnChannel(); - final boolean canWrite; - if (requiredCapacity > this.writeBufferHighWaterMark) { - canWrite = totalPendingWrites == 0; - } else { - canWrite = (totalPendingWrites + requiredCapacity) <= this.writeBufferHighWaterMark; - } - return canWrite; - } - private Object getFileObject(FileChannel fileChannel, long offset, int dataSize) { if (USE_FILE_REGION && channel.pipeline().get(SslHandler.class) == null) { return new DefaultFileRegion(fileChannel, offset, dataSize) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java index 68261bafe8d..26dae1164c4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java @@ -47,18 +47,16 @@ public interface Connection { boolean isOpen(); /** - * Causes the current thread to wait until the connection can enqueue the required capacity unless the specified waiting time elapses. - * The available capacity of the connection could change concurrently hence this method is suitable to perform precise flow-control - * only in a single writer case, while its precision decrease inversely proportional with the rate and the number of concurrent writers. + * Causes the current thread to wait until the connection can enqueue a write request unless the specified waiting time elapses. + * The available capacity of the connection could change concurrently hence this method is not suitable to perform precise flow-control. * If the current thread is not allowed to block the timeout will be ignored dependently on the connection type. * - * @param requiredCapacity the capacity in bytes to be enqueued * @param timeout the maximum time to wait * @param timeUnit the time unit of the timeout argument * @return {@code true} if the connection can enqueue {@code requiredCapacity} bytes, {@code false} otherwise * @throws IllegalStateException if the connection is closed */ - default boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) { + default boolean blockUntilWritable(final long timeout, final TimeUnit timeUnit) { return true; } diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java index a318a0fb8c5..9272a5e015f 100644 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java @@ -238,7 +238,7 @@ public ActiveMQPrincipal getDefaultActiveMQPrincipal() { } @Override - public boolean blockUntilWritable(int size, long timeout) { + public boolean blockUntilWritable(long timeout) { return false; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index fe364c06e38..f222578a583 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -378,7 +378,7 @@ private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp replicationStream.execute(() -> { if (enabled) { pendingTokens.add(repliToken); - flowControl(packet.expectedEncodeSize()); + flowControl(); replicatingChannel.send(packet); } else { packet.release(); @@ -402,7 +402,7 @@ private OperationContext sendSyncFileMessage(final ReplicationSyncFileMessage sy if (enabled) { try { pendingTokens.add(repliToken); - flowControl(syncFileMessage.expectedEncodeSize()); + flowControl(); if (syncFileMessage.getFileId() != -1 && syncFileMessage.getDataSize() > 0) { replicatingChannel.send(syncFileMessage, syncFileMessage.getFileChannel(), syncFileMessage.getOffset(), syncFileMessage.getDataSize(), @@ -426,8 +426,8 @@ private OperationContext sendSyncFileMessage(final ReplicationSyncFileMessage sy * This was written as a refactoring of sendReplicatePacket. * In case you refactor this in any way, this method must hold a lock on replication lock. . */ - private boolean flowControl(int size) { - boolean flowWorked = replicatingChannel.getConnection().blockUntilWritable(size, timeout); + private boolean flowControl() { + boolean flowWorked = replicatingChannel.getConnection().blockUntilWritable(timeout); if (!flowWorked) { try { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java index 26904932636..e5369301150 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java @@ -112,7 +112,7 @@ public void throwsExceptionOnBlockUntilWritableIfClosed() { conn.close(); //to make sure the channel is closed it needs to run the pending tasks channel.runPendingTasks(); - conn.blockUntilWritable(0, 0, TimeUnit.NANOSECONDS); + conn.blockUntilWritable(0, TimeUnit.NANOSECONDS); } @Test From 923fccb986375d3297e2d87d8c12c655903f1b8f Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Mon, 4 Nov 2019 11:40:35 +0100 Subject: [PATCH 5/6] ARTEMIS-2336 Control backlog of un-flushed Netty writes --- .../artemis/core/protocol/core/Channel.java | 10 ++ .../core/protocol/core/impl/ChannelImpl.java | 29 +++++- .../artemis/core/paging/PagingStore.java | 5 +- .../core/paging/impl/PagingStoreImpl.java | 6 +- .../impl/journal/JournalStorageManager.java | 23 +++-- .../core/replication/ReplicationManager.java | 91 ++++++++++++------- .../failover/LiveCrashOnBackupSyncTest.java | 6 +- .../cluster/util/BackupSyncDelay.java | 11 +++ .../storage/PersistMultiThreadTest.java | 4 +- 9 files changed, 134 insertions(+), 51 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java index 4c3b1a943f6..a7433f6b582 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java @@ -67,6 +67,16 @@ public interface Channel { */ boolean send(Packet packet); + /** + * Sends a packet on this channel. + * + * @param packet the packet to send + * @param callback callback after send + * @return false if the packet was rejected by an outgoing interceptor; true if the send was + * successful + */ + boolean send(Packet packet, Callback callback); + /** * Sends a packet and file on this channel. * diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 352a5bcb072..1370968f421 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -336,9 +336,16 @@ private ActiveMQBuffer beforeSend(final Packet packet, final int reconnectID) { return buffer; } - // This must never called by more than one thread concurrently private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) { + return send(packet, reconnectID, flush, batch, null); + } + + // This must never called by more than one thread concurrently + private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch, Callback callback) { if (invokeInterceptors(packet, interceptors, connection) != null) { + if (callback != null) { + callback.done(false); + } return false; } @@ -348,19 +355,31 @@ private boolean send(final Packet packet, final int reconnectID, final boolean f // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp // buffer is full, preventing any incoming buffers being handled and blocking failover try { - connection.getTransportConnection().write(buffer, flush, batch); + if (callback == null) { + connection.getTransportConnection().write(buffer, flush, batch); + } else { + connection.getTransportConnection().write(buffer, flush, batch, (ChannelFutureListener) future -> callback.done(future == null || future.isSuccess())); + } } catch (Throwable t) { //If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full. //The client would get still know about this as the exception bubbles up the call stack instead. if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { responseAsyncCache.remove(packet.getCorrelationID()); } + if (callback != null) { + callback.done(false); + } throw t; } return true; } } + @Override + public boolean send(Packet packet, Callback callback) { + return send(packet, -1, false, false, callback); + } + @Override public boolean send(Packet packet, FileChannel fileChannel, @@ -368,6 +387,9 @@ public boolean send(Packet packet, int dataSize, Callback callback) { if (invokeInterceptors(packet, interceptors, connection) != null) { + if (callback != null) { + callback.done(false); + } return false; } @@ -385,6 +407,9 @@ public boolean send(Packet packet, if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { responseAsyncCache.remove(packet.getCorrelationID()); } + if (callback != null) { + callback.done(false); + } throw t; } return true; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java index 275805ec2bd..8a0ad93ac73 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java @@ -178,9 +178,12 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener * * @param replicator * @param pageIds + * @param flushFileTimeoutMillis * @throws Exception */ - void sendPages(ReplicationManager replicator, Collection pageIds) throws Exception; + void sendPages(ReplicationManager replicator, + Collection pageIds, + long flushFileTimeoutMillis) throws Exception; /** * This method will disable cleanup of pages. No page will be deleted after this call. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 452be16e737..322ae20e5f3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -1185,7 +1185,9 @@ public Collection getCurrentIds() throws Exception { } @Override - public void sendPages(ReplicationManager replicator, Collection pageIds) throws Exception { + public void sendPages(ReplicationManager replicator, + Collection pageIds, + long flushFileTimeoutMillis) throws Exception { final SequentialFileFactory factory = fileFactory; for (Integer id : pageIds) { SequentialFile sFile = factory.createSequentialFile(createFileName(id)); @@ -1193,7 +1195,7 @@ public void sendPages(ReplicationManager replicator, Collection pageIds continue; } ActiveMQServerLogger.LOGGER.replicaSyncFile(sFile, sFile.size()); - replicator.syncPages(sFile, id, getAddress()); + replicator.syncPages(sFile, id, getAddress()).get(flushFileTimeoutMillis, TimeUnit.MILLISECONDS); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index e5da1d9a485..f36cb5672bd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -595,11 +595,13 @@ public SequentialFile createFileForLargeMessage(final long messageID, LargeMessa /** * Send an entire journal file to a replicating backup server. */ - private void sendJournalFile(JournalFile[] journalFiles, JournalContent type) throws Exception { + private void sendJournalFile(JournalFile[] journalFiles, + JournalContent type, + long flushFileTimeoutMillis) throws Exception { for (JournalFile jf : journalFiles) { if (!started) return; - replicator.syncJournalFile(jf, type); + replicator.syncJournalFile(jf, type).get(flushFileTimeoutMillis, TimeUnit.MILLISECONDS); } } @@ -688,10 +690,10 @@ public void startReplication(ReplicationManager replicationManager, storageManagerLock.writeLock().unlock(); } - sendJournalFile(messageFiles, JournalContent.MESSAGES); - sendJournalFile(bindingsFiles, JournalContent.BINDINGS); - sendLargeMessageFiles(pendingLargeMessages); - sendPagesToBackup(pageFilesToSync, pagingManager); + sendJournalFile(messageFiles, JournalContent.MESSAGES, initialReplicationSyncTimeout); + sendJournalFile(bindingsFiles, JournalContent.BINDINGS, initialReplicationSyncTimeout); + sendLargeMessageFiles(pendingLargeMessages, initialReplicationSyncTimeout); + sendPagesToBackup(pageFilesToSync, pagingManager, initialReplicationSyncTimeout); storageManagerLock.writeLock().lock(); try { @@ -714,7 +716,7 @@ public void startReplication(ReplicationManager replicationManager, } } - private void sendLargeMessageFiles(final Map> pendingLargeMessages) throws Exception { + private void sendLargeMessageFiles(final Map> pendingLargeMessages, long flushFileTimeoutMillis) throws Exception { Iterator>> iter = pendingLargeMessages.entrySet().iterator(); while (started && iter.hasNext()) { Map.Entry> entry = iter.next(); @@ -725,7 +727,7 @@ private void sendLargeMessageFiles(final Map> pendingLa if (!seqFile.exists()) continue; if (replicator != null) { - replicator.syncLargeMessageFile(seqFile, size, id); + replicator.syncLargeMessageFile(seqFile, size, id).get(flushFileTimeoutMillis, TimeUnit.MILLISECONDS); } else { throw ActiveMQMessageBundle.BUNDLE.replicatorIsNull(); } @@ -792,12 +794,13 @@ private Map> recoverPendingLargeMessages() throws Excep * @throws Exception */ private void sendPagesToBackup(Map> pageFilesToSync, - PagingManager manager) throws Exception { + PagingManager manager, + long flushFileTimeoutMillis) throws Exception { for (Map.Entry> entry : pageFilesToSync.entrySet()) { if (!started) return; PagingStore store = manager.getPageStore(entry.getKey()); - store.sendPages(replicator, entry.getValue()); + store.sendPages(replicator, entry.getValue(), flushFileTimeoutMillis); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index f222578a583..ff0e85a2553 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.replication; +import java.io.File; +import java.io.IOException; import java.io.RandomAccessFile; import java.nio.channels.FileChannel; import java.util.ArrayList; @@ -23,6 +25,7 @@ import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -389,10 +392,13 @@ private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp return repliToken; } - private OperationContext sendSyncFileMessage(final ReplicationSyncFileMessage syncFileMessage, boolean lastChunk) { + private void sendSyncFileMessage(final ReplicationSyncFileMessage syncFileMessage, boolean lastChunk, CompletableFuture flushed) { if (!enabled) { syncFileMessage.release(); - return null; + if (flushed != null) { + flushed.completeExceptionally(new IllegalStateException("ReplicationManager wasn't enabled!")); + } + return; } final OperationContext repliToken = OperationContextImpl.getContext(ioExecutorFactory); @@ -403,12 +409,26 @@ private OperationContext sendSyncFileMessage(final ReplicationSyncFileMessage sy try { pendingTokens.add(repliToken); flowControl(); + Channel.Callback callback = null; + if (flushed != null || lastChunk) { + callback = success -> { + if (lastChunk) { + syncFileMessage.release(); + } + if (flushed != null) { + if (success) { + flushed.complete(null); + } else { + flushed.completeExceptionally(new IOException("The file hasn't been flushed to the network")); + } + } + }; + } if (syncFileMessage.getFileId() != -1 && syncFileMessage.getDataSize() > 0) { replicatingChannel.send(syncFileMessage, syncFileMessage.getFileChannel(), - syncFileMessage.getOffset(), syncFileMessage.getDataSize(), - lastChunk ? (Channel.Callback) success -> syncFileMessage.release() : null); + syncFileMessage.getOffset(), syncFileMessage.getDataSize(), callback); } else { - replicatingChannel.send(syncFileMessage); + replicatingChannel.send(syncFileMessage, callback); } } catch (Exception e) { syncFileMessage.release(); @@ -418,8 +438,6 @@ private OperationContext sendSyncFileMessage(final ReplicationSyncFileMessage sy repliToken.replicationDone(); } }); - - return repliToken; } /** @@ -527,29 +545,23 @@ public int getEncodeSize() { * @throws ActiveMQException * @throws Exception */ - public void syncJournalFile(JournalFile jf, AbstractJournalStorageManager.JournalContent content) throws Exception { - if (!enabled) { - return; - } + public CompletableFuture syncJournalFile(JournalFile jf, AbstractJournalStorageManager.JournalContent content) throws Exception { SequentialFile file = jf.getFile().cloneFile(); try { ActiveMQServerLogger.LOGGER.replicaSyncFile(file, file.size()); - sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE); + return sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE); } finally { if (file.isOpen()) file.close(); } } - public void syncLargeMessageFile(SequentialFile file, long size, long id) throws Exception { - if (enabled) { - sendLargeFile(null, null, id, file, size); - } + public CompletableFuture syncLargeMessageFile(SequentialFile file, long size, long id) throws Exception { + return sendLargeFile(null, null, id, file, size); } - public void syncPages(SequentialFile file, long id, SimpleString queueName) throws Exception { - if (enabled) - sendLargeFile(null, queueName, id, file, Long.MAX_VALUE); + public CompletableFuture syncPages(SequentialFile file, long id, SimpleString queueName) throws Exception { + return sendLargeFile(null, queueName, id, file, Long.MAX_VALUE); } private class FlushAction implements Runnable { @@ -573,10 +585,12 @@ public void run() { private void sendEmptyFile(AbstractJournalStorageManager.JournalContent content, SimpleString pageStore, final long id, - String fileName) throws Exception { + String fileName, + CompletableFuture onFlushed) throws Exception { logger.debugf("sending empty file %s", fileName); final FlushAction action = new FlushAction(); - sendSyncFileMessage(new ReplicationSyncFileMessage(content, pageStore, id, null, null, 0, 0), true); + sendSyncFileMessage(new ReplicationSyncFileMessage(content, pageStore, id, null, + null, 0, 0), true, onFlushed); flushReplicationStream(action); } @@ -590,23 +604,25 @@ private void sendEmptyFile(AbstractJournalStorageManager.JournalContent content, * @param maxBytesToSend maximum number of bytes to read and send from the file * @throws Exception */ - private void sendLargeFile(AbstractJournalStorageManager.JournalContent content, - SimpleString pageStore, - final long id, - SequentialFile file, - long maxBytesToSend) throws Exception { + private CompletableFuture sendLargeFile(AbstractJournalStorageManager.JournalContent content, + SimpleString pageStore, + final long id, + SequentialFile file, + long maxBytesToSend) throws Exception { if (!enabled) - return; + return CompletableFuture.completedFuture(null); if (!file.isOpen()) { file.open(); } final int size = 1024 * 1024; final long fileSize = file.size(); + final File javaFile = file.getJavaFile(); if (fileSize == 0) { final String fileName = file.getFileName(); file.close(); - sendEmptyFile(content, pageStore, id, fileName); - return; + final CompletableFuture onFlushed = new CompletableFuture<>(); + sendEmptyFile(content, pageStore, id, fileName, onFlushed); + return onFlushed; } int flowControlSize = 10; @@ -616,8 +632,9 @@ private void sendLargeFile(AbstractJournalStorageManager.JournalContent content, long offset = 0; RandomAccessFile raf = null; FileChannel fileChannel = null; + CompletableFuture onFlushed = null; try { - raf = new RandomAccessFile(file.getJavaFile(), "r"); + raf = new RandomAccessFile(javaFile, "r"); fileChannel = raf.getChannel(); while (true) { long chunkSize = Math.min(size, fileSize - offset); @@ -630,12 +647,18 @@ private void sendLargeFile(AbstractJournalStorageManager.JournalContent content, maxBytesToSend = maxBytesToSend - chunkSize; } } - logger.debug("sending " + toSend + " bytes on file " + file.getFileName()); + logger.debugf("sending %d bytes on file %s", toSend, file.getFileName()); // sending -1 or 0 bytes will close the file at the backup // We cannot simply send everything of a file through the executor, // otherwise we would run out of memory. // so we don't use the executor here - sendSyncFileMessage(new ReplicationSyncFileMessage(content, pageStore, id, raf, fileChannel, offset, toSend), offset + toSend == fileSize); + final boolean lastChunk = offset + toSend == fileSize; + if (lastChunk && (toSend == 0 || maxBytesToSend == 0)) { + onFlushed = new CompletableFuture<>(); + } else { + onFlushed = null; + } + sendSyncFileMessage(new ReplicationSyncFileMessage(content, pageStore, id, raf, fileChannel, offset, toSend), lastChunk, onFlushed); packetsSent++; offset += toSend; @@ -646,10 +669,12 @@ private void sendLargeFile(AbstractJournalStorageManager.JournalContent content, break; } flushReplicationStream(action); - + assert onFlushed != null; + return onFlushed; } catch (Exception e) { if (raf != null) raf.close(); + onFlushed.completeExceptionally(e); throw e; } finally { if (file.isOpen()) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveCrashOnBackupSyncTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveCrashOnBackupSyncTest.java index c7887002387..6f1c9142cae 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveCrashOnBackupSyncTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveCrashOnBackupSyncTest.java @@ -257,9 +257,11 @@ class DelayPagingStoreImpl extends PagingStoreImpl { } @Override - public void sendPages(ReplicationManager replicator, Collection pageIds) throws Exception { + public void sendPages(ReplicationManager replicator, + Collection pageIds, + long flushFileTimeoutMillis) throws Exception { //in order to extend the synchronization time Thread.sleep(20 * 1000); - super.sendPages(replicator, pageIds); + super.sendPages(replicator, pageIds, flushFileTimeoutMillis); } } \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java index cb91f7c0e4b..3f0b782d0ea 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java @@ -212,8 +212,19 @@ public boolean send(Packet packet) { return true; } + @Override + public boolean send(Packet packet, Callback callback) { + if (callback != null) { + callback.done(true); + } + return true; + } + @Override public boolean send(Packet packet, FileChannel fileChannel, long offset, int dataSize, Callback callback) { + if (callback != null) { + callback.done(true); + } return true; } diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java index 30aa4c361e5..16a304c9545 100644 --- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java +++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java @@ -441,7 +441,9 @@ public Collection getCurrentIds() throws Exception { } @Override - public void sendPages(ReplicationManager replicator, Collection pageIds) throws Exception { + public void sendPages(ReplicationManager replicator, + Collection pageIds, + long flushFileTimeoutMillis) throws Exception { } From 1d16abf2911f9ba2e84dbc06e166cf70c4fb197f Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Mon, 4 Nov 2019 12:20:47 +0100 Subject: [PATCH 6/6] ARTEMIS-2336 Fixed test due to ReplicationManager API changes --- .../impl/journal/JournalStorageManagerTest.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java index a0580941bea..b9661a1bfcf 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java @@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; +import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; @@ -54,6 +55,7 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assume.assumeTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -124,8 +126,14 @@ public void testStopReplicationDoesNotDeadlockWhileStopping() throws Exception { manager.loadMessageJournal(postOffice, null, null, null, null, null, null, journalLoader); final ReplicationManager replicationManager = mock(ReplicationManager.class); final PagingManager pagingManager = mock(PagingManager.class); + when(replicationManager.syncJournalFile(any(JournalFile.class), any(AbstractJournalStorageManager.JournalContent.class))) + .thenReturn(CompletableFuture.completedFuture(null)); + when(replicationManager.syncLargeMessageFile(any(SequentialFile.class), anyLong(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(null)); + when(replicationManager.syncPages(any(SequentialFile.class), anyLong(), any(SimpleString.class))) + .thenReturn(CompletableFuture.completedFuture(null)); when(pagingManager.getStoreNames()).thenReturn(new SimpleString[0]); - manager.startReplication(replicationManager, pagingManager, UUID.randomUUID().toString(), false, 0); + manager.startReplication(replicationManager, pagingManager, UUID.randomUUID().toString(), false, Long.MAX_VALUE); final LargeServerMessage largeMessage = manager.createLargeMessage(manager.generateID() + 1, new CoreMessage()); largeMessage.setDurable(true); when(replicationManager.isSynchronizing()).thenReturn(true);