Skip to content

Commit

Permalink
Update Pulsar client to 2.8.1 and imported third party libs like Netty
Browse files Browse the repository at this point in the history
Ignore system topics in unsubscribe API (workaround for apache/pulsar#12727)
  • Loading branch information
eolivelli authored and sijie committed Jul 27, 2023
1 parent 8d80704 commit e6a6220
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -48,7 +48,7 @@
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<jms.version>2.0.3</jms.version>
<pulsar.version>2.8.0</pulsar.version>
<pulsar.version>2.8.1</pulsar.version>
<activemq.version>5.16.1</activemq.version>
<hawtbuf.version>1.11</hawtbuf.version>
<curator.version>5.1.0</curator.version>
Expand Down
2 changes: 1 addition & 1 deletion pulsar-jms-all/src/license/override-THIRD-PARTY.properties
Expand Up @@ -15,4 +15,4 @@
#

net.jcip--jcip-annotations--1.0==Apache License, Version 2.0
io.netty--netty-tcnative-boringssl-static--2.0.38.Final==Apache License, Version 2.0
io.netty--netty-tcnative-boringssl-static--2.0.40.Final==Apache License, Version 2.0
Expand Up @@ -36,13 +36,13 @@ public void testPulsar272() throws Exception {
}

@Test
public void testPulsar280() throws Exception {
test("apachepulsar/pulsar:2.8.0", false);
public void testPulsar281() throws Exception {
test("apachepulsar/pulsar:2.8.1", false);
}

@Test
public void testPulsar280Transactions() throws Exception {
test("apachepulsar/pulsar:2.8.0", true);
public void testPulsar281Transactions() throws Exception {
test("apachepulsar/pulsar:2.8.1", true);
}

private void test(String image, boolean transactions) throws Exception {
Expand Down
Expand Up @@ -29,21 +29,8 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.*;
import javax.jms.IllegalStateException;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.JMSRuntimeException;
import javax.jms.JMSSecurityException;
import javax.jms.JMSSecurityRuntimeException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand All @@ -68,6 +55,9 @@
@Slf4j
public class PulsarConnectionFactory
implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, AutoCloseable {

private static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack";

private static final Set<String> clientIdentifiers = new ConcurrentSkipListSet<>();

private final Map<String, Producer<byte[]>> producers = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -972,9 +962,20 @@ public boolean deleteSubscription(PulsarDestination destination, String name)
// required for TCK, scan for all subscriptions
List<String> allTopics = pulsarAdmin.topics().getList(systemNamespace);
for (String topic : allTopics) {
if (topic.endsWith(PENDING_ACK_STORE_SUFFIX)) {
// skip Transaction related system topics
log.info("Ignoring system topic {}", topic);
continue;
}
log.info("Scanning topic {}", topic);
List<String> subscriptions = pulsarAdmin.topics().getSubscriptions(topic);
log.info("Subscriptions {}", subscriptions);
List<String> subscriptions;
try {
subscriptions = pulsarAdmin.topics().getSubscriptions(topic);
log.info("Subscriptions {}", subscriptions);
} catch (PulsarAdminException.NotFoundException notFound) {
log.error("Skipping topic {}", topic);
subscriptions = Collections.emptyList();
}
for (String subscription : subscriptions) {
log.info("Found subscription {} ", subscription);
if (subscription.equals(name)) {
Expand Down
Expand Up @@ -34,7 +34,7 @@ public PulsarContainer(Network network) {
public void start() throws Exception {
CountDownLatch pulsarReady = new CountDownLatch(1);
pulsarContainer =
new GenericContainer<>("apachepulsar/pulsar:2.8.0")
new GenericContainer<>("apachepulsar/pulsar:2.8.1")
.withNetwork(network)
.withNetworkAliases("pulsar")
.withCommand(
Expand Down
Expand Up @@ -15,4 +15,4 @@
#

net.jcip--jcip-annotations--1.0==Apache License, Version 2.0
io.netty--netty-tcnative-boringssl-static--2.0.38.Final==Apache License, Version 2.0
io.netty--netty-tcnative-boringssl-static--2.0.40.Final==Apache License, Version 2.0
2 changes: 1 addition & 1 deletion tck-executor/start_pulsar.sh
Expand Up @@ -2,7 +2,7 @@

set -x -e

IMAGENAME=apachepulsar/pulsar:2.8.0
IMAGENAME=apachepulsar/pulsar:2.8.1

HERE=$(dirname $0)
HERE=$(realpath "$HERE")
Expand Down

0 comments on commit e6a6220

Please sign in to comment.