Skip to content

Commit

Permalink
setTimeoutInfinite in xread/xreadGroup (#2059)
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 committed Oct 16, 2019
1 parent dd2ef2a commit d165fa4
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 17 deletions.
18 changes: 14 additions & 4 deletions src/main/java/redis/clients/jedis/BinaryJedis.java
Expand Up @@ -3320,9 +3320,9 @@ protected static byte[][] getParamsWithBinary(List<byte[]> keys, List<byte[]> ar
@Override
public Object eval(final byte[] script, final byte[] keyCount, final byte[]... params) {
checkIsInMultiOrPipeline();
client.eval(script, keyCount, params);
client.setTimeoutInfinite();
try {
client.eval(script, keyCount, params);
return client.getOne();
} finally {
client.rollbackTimeout();
Expand Down Expand Up @@ -3352,9 +3352,9 @@ public Object evalsha(final byte[] sha1, final List<byte[]> keys, final List<byt
@Override
public Object evalsha(final byte[] sha1, final int keyCount, final byte[]... params) {
checkIsInMultiOrPipeline();
client.evalsha(sha1, keyCount, params);
client.setTimeoutInfinite();
try {
client.evalsha(sha1, keyCount, params);
return client.getOne();
} finally {
client.rollbackTimeout();
Expand Down Expand Up @@ -3951,15 +3951,25 @@ public Long hstrlen(final byte[] key, final byte[] field) {
public List<byte[]> xread(int count, long block, Map<byte[], byte[]> streams) {
checkIsInMultiOrPipeline();
client.xread(count, block, streams);
return client.getBinaryMultiBulkReply();
client.setTimeoutInfinite();
try {
return client.getBinaryMultiBulkReply();
} finally {
client.rollbackTimeout();
}
}

@Override
public List<byte[]> xreadGroup(byte[] groupname, byte[] consumer, int count, long block, boolean noAck,
Map<byte[], byte[]> streams) {
checkIsInMultiOrPipeline();
client.xreadGroup(groupname, consumer, count, block, noAck, streams);
return client.getBinaryMultiBulkReply();
client.setTimeoutInfinite();
try {
return client.getBinaryMultiBulkReply();
} finally {
client.rollbackTimeout();
}
}

@Override
Expand Down
30 changes: 17 additions & 13 deletions src/main/java/redis/clients/jedis/Jedis.java
Expand Up @@ -2790,7 +2790,6 @@ public void subscribe(final JedisPubSub jedisPubSub, final String... channels) {
@Override
public Long publish(final String channel, final String message) {
checkIsInMultiOrPipeline();
connect();
client.publish(channel, message);
return client.getIntegerReply();
}
Expand Down Expand Up @@ -3804,20 +3803,25 @@ public List<Entry<String, List<StreamEntry>>> xreadGroup(final String groupname,
final boolean noAck, final Entry<String, StreamEntryID>... streams) {
checkIsInMultiOrPipeline();
client.xreadGroup(groupname, consumer, count, block, noAck, streams);
client.setTimeoutInfinite();

List<Object> streamsEntries = client.getObjectMultiBulkReply();
if(streamsEntries == null) {
return null;
}

List<Entry<String, List<StreamEntry>>> result = new ArrayList<>(streamsEntries.size());
for(Object streamObj : streamsEntries) {
List<Object> stream = (List<Object>)streamObj;
String streamId = SafeEncoder.encode((byte[])stream.get(0));
List<StreamEntry> streamEntries = BuilderFactory.STREAM_ENTRY_LIST.build(stream.get(1));
result.add(new AbstractMap.SimpleEntry<String, List<StreamEntry>>(streamId, streamEntries));
try {
List<Object> streamsEntries = client.getObjectMultiBulkReply();
if(streamsEntries == null) {
return null;
}

List<Entry<String, List<StreamEntry>>> result = new ArrayList<>(streamsEntries.size());
for(Object streamObj : streamsEntries) {
List<Object> stream = (List<Object>)streamObj;
String streamId = SafeEncoder.encode((byte[])stream.get(0));
List<StreamEntry> streamEntries = BuilderFactory.STREAM_ENTRY_LIST.build(stream.get(1));
result.add(new AbstractMap.SimpleEntry<String, List<StreamEntry>>(streamId, streamEntries));
}
return result;
} finally {
client.rollbackTimeout();
}
return result;
}

@Override
Expand Down

0 comments on commit d165fa4

Please sign in to comment.