Skip to content

Commit

Permalink
Backport PubSub handle array of messages (redis#3811)
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 committed Apr 9, 2024
1 parent 13abfd1 commit 90cf1e7
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 5 deletions.
8 changes: 6 additions & 2 deletions src/main/java/redis/clients/jedis/BinaryJedisPubSub.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,12 @@ private void process() {
onUnsubscribe(bchannel, subscribedChannels);
} else if (Arrays.equals(MESSAGE.getRaw(), resp)) {
final byte[] bchannel = (byte[]) reply.get(1);
final byte[] bmesg = (byte[]) reply.get(2);
onMessage(bchannel, bmesg);
final Object mesg = reply.get(2);
if (mesg instanceof List) {
((List<byte[]>) mesg).forEach(bmesg -> onMessage(bchannel, bmesg));
} else {
onMessage(bchannel, (mesg == null) ? null : (byte[]) mesg);
}
} else if (Arrays.equals(PMESSAGE.getRaw(), resp)) {
final byte[] bpattern = (byte[]) reply.get(1);
final byte[] bchannel = (byte[]) reply.get(2);
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/redis/clients/jedis/JedisPubSub.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,13 @@ private void process() {
onUnsubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(MESSAGE.getRaw(), resp)) {
final byte[] bchannel = (byte[]) reply.get(1);
final byte[] bmesg = (byte[]) reply.get(2);
final Object mesg = reply.get(2);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
onMessage(strchannel, strmesg);
if (mesg instanceof List) {
((List<byte[]>) mesg).forEach(bmesg -> onMessage(strchannel, SafeEncoder.encode(bmesg)));
} else {
onMessage(strchannel, (mesg == null) ? null : SafeEncoder.encode((byte[]) mesg));
}
} else if (Arrays.equals(PMESSAGE.getRaw(), resp)) {
final byte[] bpattern = (byte[]) reply.get(1);
final byte[] bchannel = (byte[]) reply.get(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static redis.clients.jedis.Protocol.Command.CLIENT;

import java.io.IOException;
import java.net.UnknownHostException;
Expand All @@ -15,6 +16,7 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import org.junit.Test;

Expand Down Expand Up @@ -532,4 +534,69 @@ private String makeLargeString(int size) {

return sb.toString();
}

@Test(timeout = 5000)
public void subscribeCacheInvalidateChannel() {
final String cacheInvalidate = "__redis__:invalidate";
final AtomicBoolean onMessage = new AtomicBoolean(false);
final JedisPubSub pubsub = new JedisPubSub() {
@Override public void onMessage(String channel, String message) {
onMessage.set(true);
assertEquals(cacheInvalidate, channel);
if (message != null) {
assertEquals("foo", message);
consumeJedis(j -> j.flushAll());
} else {
unsubscribe(channel);
}
}

@Override public void onSubscribe(String channel, int subscribedChannels) {
assertEquals(cacheInvalidate, channel);
consumeJedis(j -> j.set("foo", "bar"));
}
};

try (Jedis subscriber = createJedis()) {
long clientId = subscriber.clientId();
subscriber.sendCommand(CLIENT, "TRACKING", "ON", "REDIRECT", Long.toString(clientId), "BCAST");
subscriber.subscribe(pubsub, cacheInvalidate);
assertTrue("Subscriber didn't get any message.", onMessage.get());
}
}

@Test(timeout = 5000)
public void subscribeCacheInvalidateChannelBinary() {
final byte[] cacheInvalidate = "__redis__:invalidate".getBytes();
final AtomicBoolean onMessage = new AtomicBoolean(false);
final BinaryJedisPubSub pubsub = new BinaryJedisPubSub() {
@Override public void onMessage(byte[] channel, byte[] message) {
onMessage.set(true);
assertArrayEquals(cacheInvalidate, channel);
if (message != null) {
assertArrayEquals("foo".getBytes(), message);
consumeJedis(j -> j.flushAll());
} else {
unsubscribe(channel);
}
}

@Override public void onSubscribe(byte[] channel, int subscribedChannels) {
assertArrayEquals(cacheInvalidate, channel);
consumeJedis(j -> j.set("foo".getBytes(), "bar".getBytes()));
}
};

try (Jedis subscriber = createJedis()) {
long clientId = subscriber.clientId();
subscriber.sendCommand(CLIENT, "TRACKING", "ON", "REDIRECT", Long.toString(clientId), "BCAST");
subscriber.subscribe(pubsub, cacheInvalidate);
assertTrue("Subscriber didn't get any message.", onMessage.get());
}
}

private void consumeJedis(Consumer<Jedis> consumer) {
Thread t = new Thread(() -> consumer.accept(jedis));
t.start();
}
}

0 comments on commit 90cf1e7

Please sign in to comment.