Skip to content

Commit

Permalink
Shutdown ExecutorService only if it created on-demand
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 committed Aug 21, 2023
1 parent a286ebd commit 804c180
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions src/main/java/redis/clients/jedis/MultiNodePipelineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import redis.clients.jedis.graph.GraphCommandObjects;
import redis.clients.jedis.providers.ConnectionProvider;
import redis.clients.jedis.util.IOUtils;
import redis.clients.jedis.util.KeyValue;
import redis.clients.jedis.util.MultiNodePipelineThreadPool;

public abstract class MultiNodePipelineBase extends PipelineBase
Expand Down Expand Up @@ -93,11 +94,14 @@ public void close() {
}
}

private ExecutorService getThreadPool() {
/**
* Get thread pool and whether to close it or not.
*/
private KeyValue<ExecutorService, Boolean> getThreadPool() {
ExecutorService threadPool = MultiNodePipelineThreadPool.getThreadPool();
if (threadPool != null) return threadPool;
if (threadPool != null) return KeyValue.of(threadPool, false);

return Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);
return KeyValue.of(Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS), true);
}

@Override
Expand All @@ -107,7 +111,7 @@ public final void sync() {
}
syncing = true;

ExecutorService executorService = getThreadPool();
KeyValue<ExecutorService, Boolean> executorService = getThreadPool();

CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size());
Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator
Expand All @@ -118,7 +122,7 @@ public final void sync() {
Queue<Response<?>> queue = entry.getValue();
Connection connection = connections.get(nodeKey);
try {
executorService.submit(() -> {
executorService.getKey().submit(() -> {
try {
List<Object> unformatted = connection.getMany(queue.size());
for (Object o : unformatted) {
Expand Down Expand Up @@ -146,7 +150,9 @@ public final void sync() {
log.error("Thread is interrupted during sync.", e);
}

executorService.shutdownNow();
if (executorService.getValue()) {
executorService.getKey().shutdownNow();
}

syncing = false;
}
Expand Down

0 comments on commit 804c180

Please sign in to comment.