Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

setTimeoutInfinite in xread/xreadGroup #2059

Merged
merged 2 commits into from Oct 16, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 14 additions & 4 deletions src/main/java/redis/clients/jedis/BinaryJedis.java
Expand Up @@ -3315,9 +3315,9 @@ protected static byte[][] getParamsWithBinary(List<byte[]> keys, List<byte[]> ar

@Override
public Object eval(final byte[] script, final byte[] keyCount, final byte[]... params) {
client.eval(script, keyCount, params);
client.setTimeoutInfinite();
try {
client.eval(script, keyCount, params);
return client.getOne();
} finally {
client.rollbackTimeout();
Expand Down Expand Up @@ -3346,9 +3346,9 @@ public Object evalsha(final byte[] sha1, final List<byte[]> keys, final List<byt

@Override
public Object evalsha(final byte[] sha1, final int keyCount, final byte[]... params) {
client.evalsha(sha1, keyCount, params);
client.setTimeoutInfinite();
try {
client.evalsha(sha1, keyCount, params);
return client.getOne();
} finally {
client.rollbackTimeout();
Expand Down Expand Up @@ -3945,15 +3945,25 @@ public Long hstrlen(final byte[] key, final byte[] field) {
public List<byte[]> xread(int count, long block, Map<byte[], byte[]> streams) {
checkIsInMultiOrPipeline();
client.xread(count, block, streams);
return client.getBinaryMultiBulkReply();
client.setTimeoutInfinite();
try {
return client.getBinaryMultiBulkReply();
} finally {
client.rollbackTimeout();
}
}

@Override
public List<byte[]> xreadGroup(byte[] groupname, byte[] consumer, int count, long block, boolean noAck,
Map<byte[], byte[]> streams) {
checkIsInMultiOrPipeline();
client.xreadGroup(groupname, consumer, count, block, noAck, streams);
return client.getBinaryMultiBulkReply();
client.setTimeoutInfinite();
try {
return client.getBinaryMultiBulkReply();
} finally {
client.rollbackTimeout();
}
}

@Override
Expand Down
32 changes: 18 additions & 14 deletions src/main/java/redis/clients/jedis/Jedis.java
Expand Up @@ -2773,9 +2773,9 @@ public String configSet(final String parameter, final String value) {

@Override
public Object eval(final String script, final int keyCount, final String... params) {
client.eval(script, keyCount, params);
client.setTimeoutInfinite();
try {
client.eval(script, keyCount, params);
return getEvalResult();
} finally {
client.rollbackTimeout();
Expand All @@ -2795,7 +2795,6 @@ public void subscribe(final JedisPubSub jedisPubSub, final String... channels) {
@Override
public Long publish(final String channel, final String message) {
checkIsInMultiOrPipeline();
connect();
client.publish(channel, message);
return client.getIntegerReply();
}
Expand Down Expand Up @@ -3793,20 +3792,25 @@ public List<Entry<String, List<StreamEntry>>> xreadGroup(final String groupname,
final boolean noAck, final Entry<String, StreamEntryID>... streams) {
checkIsInMultiOrPipeline();
client.xreadGroup(groupname, consumer, count, block, noAck, streams);
client.setTimeoutInfinite();

List<Object> streamsEntries = client.getObjectMultiBulkReply();
if(streamsEntries == null) {
return null;
}

List<Entry<String, List<StreamEntry>>> result = new ArrayList<>(streamsEntries.size());
for(Object streamObj : streamsEntries) {
List<Object> stream = (List<Object>)streamObj;
String streamId = SafeEncoder.encode((byte[])stream.get(0));
List<StreamEntry> streamEntries = BuilderFactory.STREAM_ENTRY_LIST.build(stream.get(1));
result.add(new AbstractMap.SimpleEntry<String, List<StreamEntry>>(streamId, streamEntries));
try {
List<Object> streamsEntries = client.getObjectMultiBulkReply();
if(streamsEntries == null) {
return null;
}

List<Entry<String, List<StreamEntry>>> result = new ArrayList<>(streamsEntries.size());
for(Object streamObj : streamsEntries) {
List<Object> stream = (List<Object>)streamObj;
String streamId = SafeEncoder.encode((byte[])stream.get(0));
List<StreamEntry> streamEntries = BuilderFactory.STREAM_ENTRY_LIST.build(stream.get(1));
result.add(new AbstractMap.SimpleEntry<String, List<StreamEntry>>(streamId, streamEntries));
}
return result;
} finally {
client.rollbackTimeout();
}
return result;
}

@Override
Expand Down