From d165fa49d09a60087cbd4dbfc5c93467ba0a1ec7 Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque Date: Wed, 16 Oct 2019 12:14:17 +0600 Subject: [PATCH] setTimeoutInfinite in xread/xreadGroup (#2059) --- .../java/redis/clients/jedis/BinaryJedis.java | 18 ++++++++--- src/main/java/redis/clients/jedis/Jedis.java | 30 +++++++++++-------- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/src/main/java/redis/clients/jedis/BinaryJedis.java b/src/main/java/redis/clients/jedis/BinaryJedis.java index 1a3d95f730..3a3ee572a1 100644 --- a/src/main/java/redis/clients/jedis/BinaryJedis.java +++ b/src/main/java/redis/clients/jedis/BinaryJedis.java @@ -3320,9 +3320,9 @@ protected static byte[][] getParamsWithBinary(List keys, List ar @Override public Object eval(final byte[] script, final byte[] keyCount, final byte[]... params) { checkIsInMultiOrPipeline(); + client.eval(script, keyCount, params); client.setTimeoutInfinite(); try { - client.eval(script, keyCount, params); return client.getOne(); } finally { client.rollbackTimeout(); @@ -3352,9 +3352,9 @@ public Object evalsha(final byte[] sha1, final List keys, final List xread(int count, long block, Map streams) { checkIsInMultiOrPipeline(); client.xread(count, block, streams); - return client.getBinaryMultiBulkReply(); + client.setTimeoutInfinite(); + try { + return client.getBinaryMultiBulkReply(); + } finally { + client.rollbackTimeout(); + } } @Override @@ -3959,7 +3964,12 @@ public List xreadGroup(byte[] groupname, byte[] consumer, int count, lon Map streams) { checkIsInMultiOrPipeline(); client.xreadGroup(groupname, consumer, count, block, noAck, streams); - return client.getBinaryMultiBulkReply(); + client.setTimeoutInfinite(); + try { + return client.getBinaryMultiBulkReply(); + } finally { + client.rollbackTimeout(); + } } @Override diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index c24e57452f..2353c690db 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -2790,7 +2790,6 @@ public void subscribe(final JedisPubSub jedisPubSub, final String... channels) { @Override public Long publish(final String channel, final String message) { checkIsInMultiOrPipeline(); - connect(); client.publish(channel, message); return client.getIntegerReply(); } @@ -3804,20 +3803,25 @@ public List>> xreadGroup(final String groupname, final boolean noAck, final Entry... streams) { checkIsInMultiOrPipeline(); client.xreadGroup(groupname, consumer, count, block, noAck, streams); + client.setTimeoutInfinite(); - List streamsEntries = client.getObjectMultiBulkReply(); - if(streamsEntries == null) { - return null; - } - - List>> result = new ArrayList<>(streamsEntries.size()); - for(Object streamObj : streamsEntries) { - List stream = (List)streamObj; - String streamId = SafeEncoder.encode((byte[])stream.get(0)); - List streamEntries = BuilderFactory.STREAM_ENTRY_LIST.build(stream.get(1)); - result.add(new AbstractMap.SimpleEntry>(streamId, streamEntries)); + try { + List streamsEntries = client.getObjectMultiBulkReply(); + if(streamsEntries == null) { + return null; + } + + List>> result = new ArrayList<>(streamsEntries.size()); + for(Object streamObj : streamsEntries) { + List stream = (List)streamObj; + String streamId = SafeEncoder.encode((byte[])stream.get(0)); + List streamEntries = BuilderFactory.STREAM_ENTRY_LIST.build(stream.get(1)); + result.add(new AbstractMap.SimpleEntry>(streamId, streamEntries)); + } + return result; + } finally { + client.rollbackTimeout(); } - return result; } @Override