Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-2336 Use zero copy to replicate journal/page/large message file (AGAIN) #2845

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.core;

import java.nio.channels.FileChannel;
import java.util.concurrent.locks.Lock;

import org.apache.activemq.artemis.api.core.ActiveMQException;
Expand Down Expand Up @@ -66,6 +67,29 @@ public interface Channel {
*/
boolean send(Packet packet);

/**
* Sends a packet on this channel.
*
* @param packet the packet to send
* @param callback callback after send
* @return false if the packet was rejected by an outgoing interceptor; true if the send was
* successful
*/
boolean send(Packet packet, Callback callback);

/**
* Sends a packet and file on this channel.
*
* @param packet the packet to send
* @param fileChannel the file channel retrieved from raf
* @param offset the position of the raf
* @param dataSize the data size to send
* @param callback callback after send
* @return false if the packet was rejected by an outgoing interceptor; true if the send was
* successful
*/
boolean send(Packet packet, FileChannel fileChannel, long offset, int dataSize, Callback callback);

/**
* Sends a packet on this channel.
*
Expand Down Expand Up @@ -247,4 +271,8 @@ public interface Channel {
* @param transferring whether the channel is transferring
*/
void setTransferring(boolean transferring);

interface Callback {
void done(boolean success);
}
}
Expand Up @@ -134,11 +134,9 @@ default boolean isVersionNewFQQN() {
ActiveMQPrincipal getDefaultActiveMQPrincipal();

/**
*
* @param size size we are trying to write
* @param timeout
* @return
* @throws IllegalStateException if the connection is closed
*/
boolean blockUntilWritable(int size, long timeout);
boolean blockUntilWritable(long timeout);
}
Expand Up @@ -1017,13 +1017,14 @@ private int sendSessionSendContinuationMessage(Channel channel,
final long blockingCallTimeoutMillis = Math.max(0, connection.getBlockingCallTimeout());
final long startFlowControl = System.nanoTime();
try {
final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis);
final boolean isWritable = connection.blockUntilWritable(blockingCallTimeoutMillis);
if (!isWritable) {
final long endFlowControl = System.nanoTime();
final long elapsedFlowControl = endFlowControl - startFlowControl;
final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl);
ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage();
logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis + " ms on a not writable connection: [" + connection.getID() + "]");
logger.debugf("try to write %d bytes after blocked %d ms on a not writable connection: [%s]",
expectedEncodeSize, elapsedMillis, connection.getID());
}
if (requiresResponse) {
// When sending it blocking, only the last chunk will be blocking.
Expand Down
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;

import java.nio.channels.FileChannel;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand All @@ -25,6 +26,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import io.netty.channel.ChannelFutureListener;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
Expand All @@ -39,6 +41,7 @@
import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.ConcurrentUtil;
import org.jboss.logging.Logger;
Expand Down Expand Up @@ -148,6 +151,11 @@ public ChannelImpl(final CoreRemotingConnection connection,
}

this.interceptors = interceptors;
//zero copy transfer is initialized only for replication channels
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clebertsuconic This one is ugly I know :)
We can find a better way together ;)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah.. this looks like a Hack to me! when is sendFiles used without being the replica channel?

Copy link
Contributor Author

@franz1981 franz1981 Sep 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now is not and I've added an assert to make the test suite to fail when is not properly configured, but we can always configure Netty pipeline upfront (right after SSL handler) to handle it without affecting normal usage, If is more clear..

if (id == CHANNEL_ID.REPLICATION.id && connection.getTransportConnection() instanceof NettyConnection) {
final NettyConnection nettyConnection = (NettyConnection) connection.getTransportConnection();
nettyConnection.initializeZeroCopyTransfer();
}
}

@Override
Expand Down Expand Up @@ -274,73 +282,134 @@ private void waitForFailOver(String timeoutMsg) {
}
}

// This must never called by more than one thread concurrently
private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) {
if (invokeInterceptors(packet, interceptors, connection) != null) {
return false;
private ActiveMQBuffer beforeSend(final Packet packet, final int reconnectID) {
packet.setChannelID(id);

if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
}

synchronized (sendLock) {
packet.setChannelID(id);
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
}

ActiveMQBuffer buffer = packet.encode(connection);

if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
lock.lock();

try {
if (failingOver) {
waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send");
}

if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
// Sanity check
if (transferring) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover();
}

ActiveMQBuffer buffer = packet.encode(connection);
if (resendCache != null && packet.isRequiresConfirmations()) {
addResendPacket(packet);
}

lock.lock();
} finally {
lock.unlock();
}

try {
if (failingOver) {
waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send");
}
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id);
}

// Sanity check
if (transferring) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover();
}
checkReconnectID(reconnectID);

if (resendCache != null && packet.isRequiresConfirmations()) {
addResendPacket(packet);
//We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
//As the send could block if the response cache cannot add, preventing responses to be handled.
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
while (!responseAsyncCache.add(packet)) {
try {
Thread.sleep(1);
} catch (Exception e) {
// Ignore
}

} finally {
lock.unlock();
}
}

if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id);
return buffer;
}

private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) {
return send(packet, reconnectID, flush, batch, null);
}

// This must never called by more than one thread concurrently
private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch, Callback callback) {
if (invokeInterceptors(packet, interceptors, connection) != null) {
if (callback != null) {
callback.done(false);
}
return false;
}

checkReconnectID(reconnectID);
synchronized (sendLock) {
ActiveMQBuffer buffer = beforeSend(packet, reconnectID);

//We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
//As the send could block if the response cache cannot add, preventing responses to be handled.
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
while (!responseAsyncCache.add(packet)) {
try {
Thread.sleep(1);
} catch (Exception e) {
// Ignore
}
// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
// buffer is full, preventing any incoming buffers being handled and blocking failover
try {
if (callback == null) {
connection.getTransportConnection().write(buffer, flush, batch);
} else {
connection.getTransportConnection().write(buffer, flush, batch, (ChannelFutureListener) future -> callback.done(future == null || future.isSuccess()));
}
} catch (Throwable t) {
//If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full.
//The client would get still know about this as the exception bubbles up the call stack instead.
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
responseAsyncCache.remove(packet.getCorrelationID());
}
if (callback != null) {
callback.done(false);
}
throw t;
}
return true;
}
}

@Override
public boolean send(Packet packet, Callback callback) {
return send(packet, -1, false, false, callback);
}

@Override
public boolean send(Packet packet,
FileChannel fileChannel,
long offset,
int dataSize,
Callback callback) {
if (invokeInterceptors(packet, interceptors, connection) != null) {
if (callback != null) {
callback.done(false);
}
return false;
}

synchronized (sendLock) {
ActiveMQBuffer buffer = beforeSend(packet, -1);

// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
// buffer is full, preventing any incoming buffers being handled and blocking failover
try {
connection.getTransportConnection().write(buffer, flush, batch);
connection.getTransportConnection().write(buffer);
connection.getTransportConnection().write(fileChannel, offset, dataSize, callback == null ? null : (ChannelFutureListener) future -> callback.done(future == null || future.isSuccess()));
} catch (Throwable t) {
//If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full.
//The client would get still know about this as the exception bubbles up the call stack instead.
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
responseAsyncCache.remove(packet.getCorrelationID());
}
if (callback != null) {
callback.done(false);
}
throw t;
}
return true;
Expand Down
Expand Up @@ -336,7 +336,11 @@ protected void encodeHeader(ActiveMQBuffer buffer) {
}

protected void encodeSize(ActiveMQBuffer buffer) {
size = buffer.writerIndex();
encodeSize(buffer, buffer.writerIndex());
}

protected void encodeSize(ActiveMQBuffer buffer, int size) {
this.size = size;

// The length doesn't include the actual length byte
int len = size - DataConstants.SIZE_INT;
Expand All @@ -345,9 +349,10 @@ protected void encodeSize(ActiveMQBuffer buffer) {
}

protected ActiveMQBuffer createPacket(CoreRemotingConnection connection) {
return createPacket(connection, expectedEncodeSize());
}

int size = expectedEncodeSize();

protected ActiveMQBuffer createPacket(CoreRemotingConnection connection, int size) {
if (connection == null) {
return new ChannelBufferWrapper(Unpooled.buffer(size));
} else {
Expand Down
Expand Up @@ -244,8 +244,8 @@ public void destroy() {
}

@Override
public boolean blockUntilWritable(int size, long timeout) {
return transportConnection.blockUntilWritable(size, timeout, TimeUnit.MILLISECONDS);
public boolean blockUntilWritable(long timeout) {
return transportConnection.blockUntilWritable(timeout, TimeUnit.MILLISECONDS);
}

@Override
Expand Down