Skip to content

Commit

Permalink
ARTEMIS-2336 Control backlog of un-flushed Netty writes
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed Nov 4, 2019
1 parent e434f18 commit 923fccb
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 51 deletions.
Expand Up @@ -67,6 +67,16 @@ public interface Channel {
*/
boolean send(Packet packet);

/**
* Sends a packet on this channel.
*
* @param packet the packet to send
* @param callback callback after send
* @return false if the packet was rejected by an outgoing interceptor; true if the send was
* successful
*/
boolean send(Packet packet, Callback callback);

/**
* Sends a packet and file on this channel.
*
Expand Down
Expand Up @@ -336,9 +336,16 @@ private ActiveMQBuffer beforeSend(final Packet packet, final int reconnectID) {
return buffer;
}

// This must never called by more than one thread concurrently
private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) {
return send(packet, reconnectID, flush, batch, null);
}

// This must never called by more than one thread concurrently
private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch, Callback callback) {
if (invokeInterceptors(packet, interceptors, connection) != null) {
if (callback != null) {
callback.done(false);
}
return false;
}

Expand All @@ -348,26 +355,41 @@ private boolean send(final Packet packet, final int reconnectID, final boolean f
// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
// buffer is full, preventing any incoming buffers being handled and blocking failover
try {
connection.getTransportConnection().write(buffer, flush, batch);
if (callback == null) {
connection.getTransportConnection().write(buffer, flush, batch);
} else {
connection.getTransportConnection().write(buffer, flush, batch, (ChannelFutureListener) future -> callback.done(future == null || future.isSuccess()));
}
} catch (Throwable t) {
//If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full.
//The client would get still know about this as the exception bubbles up the call stack instead.
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
responseAsyncCache.remove(packet.getCorrelationID());
}
if (callback != null) {
callback.done(false);
}
throw t;
}
return true;
}
}

@Override
public boolean send(Packet packet, Callback callback) {
return send(packet, -1, false, false, callback);
}

@Override
public boolean send(Packet packet,
FileChannel fileChannel,
long offset,
int dataSize,
Callback callback) {
if (invokeInterceptors(packet, interceptors, connection) != null) {
if (callback != null) {
callback.done(false);
}
return false;
}

Expand All @@ -385,6 +407,9 @@ public boolean send(Packet packet,
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
responseAsyncCache.remove(packet.getCorrelationID());
}
if (callback != null) {
callback.done(false);
}
throw t;
}
return true;
Expand Down
Expand Up @@ -178,9 +178,12 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
*
* @param replicator
* @param pageIds
* @param flushFileTimeoutMillis
* @throws Exception
*/
void sendPages(ReplicationManager replicator, Collection<Integer> pageIds) throws Exception;
void sendPages(ReplicationManager replicator,
Collection<Integer> pageIds,
long flushFileTimeoutMillis) throws Exception;

/**
* This method will disable cleanup of pages. No page will be deleted after this call.
Expand Down
Expand Up @@ -1185,15 +1185,17 @@ public Collection<Integer> getCurrentIds() throws Exception {
}

@Override
public void sendPages(ReplicationManager replicator, Collection<Integer> pageIds) throws Exception {
public void sendPages(ReplicationManager replicator,
Collection<Integer> pageIds,
long flushFileTimeoutMillis) throws Exception {
final SequentialFileFactory factory = fileFactory;
for (Integer id : pageIds) {
SequentialFile sFile = factory.createSequentialFile(createFileName(id));
if (!sFile.exists()) {
continue;
}
ActiveMQServerLogger.LOGGER.replicaSyncFile(sFile, sFile.size());
replicator.syncPages(sFile, id, getAddress());
replicator.syncPages(sFile, id, getAddress()).get(flushFileTimeoutMillis, TimeUnit.MILLISECONDS);
}
}

Expand Down
Expand Up @@ -595,11 +595,13 @@ public SequentialFile createFileForLargeMessage(final long messageID, LargeMessa
/**
* Send an entire journal file to a replicating backup server.
*/
private void sendJournalFile(JournalFile[] journalFiles, JournalContent type) throws Exception {
private void sendJournalFile(JournalFile[] journalFiles,
JournalContent type,
long flushFileTimeoutMillis) throws Exception {
for (JournalFile jf : journalFiles) {
if (!started)
return;
replicator.syncJournalFile(jf, type);
replicator.syncJournalFile(jf, type).get(flushFileTimeoutMillis, TimeUnit.MILLISECONDS);
}
}

Expand Down Expand Up @@ -688,10 +690,10 @@ public void startReplication(ReplicationManager replicationManager,
storageManagerLock.writeLock().unlock();
}

sendJournalFile(messageFiles, JournalContent.MESSAGES);
sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
sendLargeMessageFiles(pendingLargeMessages);
sendPagesToBackup(pageFilesToSync, pagingManager);
sendJournalFile(messageFiles, JournalContent.MESSAGES, initialReplicationSyncTimeout);
sendJournalFile(bindingsFiles, JournalContent.BINDINGS, initialReplicationSyncTimeout);
sendLargeMessageFiles(pendingLargeMessages, initialReplicationSyncTimeout);
sendPagesToBackup(pageFilesToSync, pagingManager, initialReplicationSyncTimeout);

storageManagerLock.writeLock().lock();
try {
Expand All @@ -714,7 +716,7 @@ public void startReplication(ReplicationManager replicationManager,
}
}

private void sendLargeMessageFiles(final Map<Long, Pair<String, Long>> pendingLargeMessages) throws Exception {
private void sendLargeMessageFiles(final Map<Long, Pair<String, Long>> pendingLargeMessages, long flushFileTimeoutMillis) throws Exception {
Iterator<Map.Entry<Long, Pair<String, Long>>> iter = pendingLargeMessages.entrySet().iterator();
while (started && iter.hasNext()) {
Map.Entry<Long, Pair<String, Long>> entry = iter.next();
Expand All @@ -725,7 +727,7 @@ private void sendLargeMessageFiles(final Map<Long, Pair<String, Long>> pendingLa
if (!seqFile.exists())
continue;
if (replicator != null) {
replicator.syncLargeMessageFile(seqFile, size, id);
replicator.syncLargeMessageFile(seqFile, size, id).get(flushFileTimeoutMillis, TimeUnit.MILLISECONDS);
} else {
throw ActiveMQMessageBundle.BUNDLE.replicatorIsNull();
}
Expand Down Expand Up @@ -792,12 +794,13 @@ private Map<Long, Pair<String, Long>> recoverPendingLargeMessages() throws Excep
* @throws Exception
*/
private void sendPagesToBackup(Map<SimpleString, Collection<Integer>> pageFilesToSync,
PagingManager manager) throws Exception {
PagingManager manager,
long flushFileTimeoutMillis) throws Exception {
for (Map.Entry<SimpleString, Collection<Integer>> entry : pageFilesToSync.entrySet()) {
if (!started)
return;
PagingStore store = manager.getPageStore(entry.getKey());
store.sendPages(replicator, entry.getValue());
store.sendPages(replicator, entry.getValue(), flushFileTimeoutMillis);
}
}

Expand Down
Expand Up @@ -16,13 +16,16 @@
*/
package org.apache.activemq.artemis.core.replication;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -389,10 +392,13 @@ private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp
return repliToken;
}

private OperationContext sendSyncFileMessage(final ReplicationSyncFileMessage syncFileMessage, boolean lastChunk) {
private void sendSyncFileMessage(final ReplicationSyncFileMessage syncFileMessage, boolean lastChunk, CompletableFuture<?> flushed) {
if (!enabled) {
syncFileMessage.release();
return null;
if (flushed != null) {
flushed.completeExceptionally(new IllegalStateException("ReplicationManager wasn't enabled!"));
}
return;
}

final OperationContext repliToken = OperationContextImpl.getContext(ioExecutorFactory);
Expand All @@ -403,12 +409,26 @@ private OperationContext sendSyncFileMessage(final ReplicationSyncFileMessage sy
try {
pendingTokens.add(repliToken);
flowControl();
Channel.Callback callback = null;
if (flushed != null || lastChunk) {
callback = success -> {
if (lastChunk) {
syncFileMessage.release();
}
if (flushed != null) {
if (success) {
flushed.complete(null);
} else {
flushed.completeExceptionally(new IOException("The file hasn't been flushed to the network"));
}
}
};
}
if (syncFileMessage.getFileId() != -1 && syncFileMessage.getDataSize() > 0) {
replicatingChannel.send(syncFileMessage, syncFileMessage.getFileChannel(),
syncFileMessage.getOffset(), syncFileMessage.getDataSize(),
lastChunk ? (Channel.Callback) success -> syncFileMessage.release() : null);
syncFileMessage.getOffset(), syncFileMessage.getDataSize(), callback);
} else {
replicatingChannel.send(syncFileMessage);
replicatingChannel.send(syncFileMessage, callback);
}
} catch (Exception e) {
syncFileMessage.release();
Expand All @@ -418,8 +438,6 @@ private OperationContext sendSyncFileMessage(final ReplicationSyncFileMessage sy
repliToken.replicationDone();
}
});

return repliToken;
}

/**
Expand Down Expand Up @@ -527,29 +545,23 @@ public int getEncodeSize() {
* @throws ActiveMQException
* @throws Exception
*/
public void syncJournalFile(JournalFile jf, AbstractJournalStorageManager.JournalContent content) throws Exception {
if (!enabled) {
return;
}
public CompletableFuture<?> syncJournalFile(JournalFile jf, AbstractJournalStorageManager.JournalContent content) throws Exception {
SequentialFile file = jf.getFile().cloneFile();
try {
ActiveMQServerLogger.LOGGER.replicaSyncFile(file, file.size());
sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
return sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
} finally {
if (file.isOpen())
file.close();
}
}

public void syncLargeMessageFile(SequentialFile file, long size, long id) throws Exception {
if (enabled) {
sendLargeFile(null, null, id, file, size);
}
public CompletableFuture<?> syncLargeMessageFile(SequentialFile file, long size, long id) throws Exception {
return sendLargeFile(null, null, id, file, size);
}

public void syncPages(SequentialFile file, long id, SimpleString queueName) throws Exception {
if (enabled)
sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
public CompletableFuture<?> syncPages(SequentialFile file, long id, SimpleString queueName) throws Exception {
return sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
}

private class FlushAction implements Runnable {
Expand All @@ -573,10 +585,12 @@ public void run() {
private void sendEmptyFile(AbstractJournalStorageManager.JournalContent content,
SimpleString pageStore,
final long id,
String fileName) throws Exception {
String fileName,
CompletableFuture<?> onFlushed) throws Exception {
logger.debugf("sending empty file %s", fileName);
final FlushAction action = new FlushAction();
sendSyncFileMessage(new ReplicationSyncFileMessage(content, pageStore, id, null, null, 0, 0), true);
sendSyncFileMessage(new ReplicationSyncFileMessage(content, pageStore, id, null,
null, 0, 0), true, onFlushed);
flushReplicationStream(action);
}

Expand All @@ -590,23 +604,25 @@ private void sendEmptyFile(AbstractJournalStorageManager.JournalContent content,
* @param maxBytesToSend maximum number of bytes to read and send from the file
* @throws Exception
*/
private void sendLargeFile(AbstractJournalStorageManager.JournalContent content,
SimpleString pageStore,
final long id,
SequentialFile file,
long maxBytesToSend) throws Exception {
private CompletableFuture<?> sendLargeFile(AbstractJournalStorageManager.JournalContent content,
SimpleString pageStore,
final long id,
SequentialFile file,
long maxBytesToSend) throws Exception {
if (!enabled)
return;
return CompletableFuture.completedFuture(null);
if (!file.isOpen()) {
file.open();
}
final int size = 1024 * 1024;
final long fileSize = file.size();
final File javaFile = file.getJavaFile();
if (fileSize == 0) {
final String fileName = file.getFileName();
file.close();
sendEmptyFile(content, pageStore, id, fileName);
return;
final CompletableFuture<?> onFlushed = new CompletableFuture<>();
sendEmptyFile(content, pageStore, id, fileName, onFlushed);
return onFlushed;
}
int flowControlSize = 10;

Expand All @@ -616,8 +632,9 @@ private void sendLargeFile(AbstractJournalStorageManager.JournalContent content,
long offset = 0;
RandomAccessFile raf = null;
FileChannel fileChannel = null;
CompletableFuture<?> onFlushed = null;
try {
raf = new RandomAccessFile(file.getJavaFile(), "r");
raf = new RandomAccessFile(javaFile, "r");
fileChannel = raf.getChannel();
while (true) {
long chunkSize = Math.min(size, fileSize - offset);
Expand All @@ -630,12 +647,18 @@ private void sendLargeFile(AbstractJournalStorageManager.JournalContent content,
maxBytesToSend = maxBytesToSend - chunkSize;
}
}
logger.debug("sending " + toSend + " bytes on file " + file.getFileName());
logger.debugf("sending %d bytes on file %s", toSend, file.getFileName());
// sending -1 or 0 bytes will close the file at the backup
// We cannot simply send everything of a file through the executor,
// otherwise we would run out of memory.
// so we don't use the executor here
sendSyncFileMessage(new ReplicationSyncFileMessage(content, pageStore, id, raf, fileChannel, offset, toSend), offset + toSend == fileSize);
final boolean lastChunk = offset + toSend == fileSize;
if (lastChunk && (toSend == 0 || maxBytesToSend == 0)) {
onFlushed = new CompletableFuture<>();
} else {
onFlushed = null;
}
sendSyncFileMessage(new ReplicationSyncFileMessage(content, pageStore, id, raf, fileChannel, offset, toSend), lastChunk, onFlushed);
packetsSent++;
offset += toSend;

Expand All @@ -646,10 +669,12 @@ private void sendLargeFile(AbstractJournalStorageManager.JournalContent content,
break;
}
flushReplicationStream(action);

assert onFlushed != null;
return onFlushed;
} catch (Exception e) {
if (raf != null)
raf.close();
onFlushed.completeExceptionally(e);
throw e;
} finally {
if (file.isOpen())
Expand Down

0 comments on commit 923fccb

Please sign in to comment.