diff --git a/pom.xml b/pom.xml index 0b8f8392..f6f209dd 100644 --- a/pom.xml +++ b/pom.xml @@ -48,7 +48,7 @@ 8 8 2.0.3 - 2.8.0 + 2.8.1 5.16.1 1.11 5.1.0 diff --git a/pulsar-jms-all/src/license/override-THIRD-PARTY.properties b/pulsar-jms-all/src/license/override-THIRD-PARTY.properties index 3c3a05d7..76b6b647 100644 --- a/pulsar-jms-all/src/license/override-THIRD-PARTY.properties +++ b/pulsar-jms-all/src/license/override-THIRD-PARTY.properties @@ -15,4 +15,4 @@ # net.jcip--jcip-annotations--1.0==Apache License, Version 2.0 -io.netty--netty-tcnative-boringssl-static--2.0.38.Final==Apache License, Version 2.0 +io.netty--netty-tcnative-boringssl-static--2.0.40.Final==Apache License, Version 2.0 diff --git a/pulsar-jms-integration-tests/src/test/java/com/datastax/oss/pulsar/jms/tests/DockerTest.java b/pulsar-jms-integration-tests/src/test/java/com/datastax/oss/pulsar/jms/tests/DockerTest.java index eddb0fcf..e709b558 100644 --- a/pulsar-jms-integration-tests/src/test/java/com/datastax/oss/pulsar/jms/tests/DockerTest.java +++ b/pulsar-jms-integration-tests/src/test/java/com/datastax/oss/pulsar/jms/tests/DockerTest.java @@ -36,13 +36,13 @@ public void testPulsar272() throws Exception { } @Test - public void testPulsar280() throws Exception { - test("apachepulsar/pulsar:2.8.0", false); + public void testPulsar281() throws Exception { + test("apachepulsar/pulsar:2.8.1", false); } @Test - public void testPulsar280Transactions() throws Exception { - test("apachepulsar/pulsar:2.8.0", true); + public void testPulsar281Transactions() throws Exception { + test("apachepulsar/pulsar:2.8.1", true); } private void test(String image, boolean transactions) throws Exception { diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java index fabb557a..5f71e562 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java @@ -29,21 +29,8 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; +import javax.jms.*; import javax.jms.IllegalStateException; -import javax.jms.InvalidClientIDException; -import javax.jms.JMSContext; -import javax.jms.JMSException; -import javax.jms.JMSRuntimeException; -import javax.jms.JMSSecurityException; -import javax.jms.JMSSecurityRuntimeException; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.QueueConnectionFactory; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicConnectionFactory; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -68,6 +55,9 @@ @Slf4j public class PulsarConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, AutoCloseable { + + private static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack"; + private static final Set clientIdentifiers = new ConcurrentSkipListSet<>(); private final Map> producers = new ConcurrentHashMap<>(); @@ -972,9 +962,20 @@ public boolean deleteSubscription(PulsarDestination destination, String name) // required for TCK, scan for all subscriptions List allTopics = pulsarAdmin.topics().getList(systemNamespace); for (String topic : allTopics) { + if (topic.endsWith(PENDING_ACK_STORE_SUFFIX)) { + // skip Transaction related system topics + log.info("Ignoring system topic {}", topic); + continue; + } log.info("Scanning topic {}", topic); - List subscriptions = pulsarAdmin.topics().getSubscriptions(topic); - log.info("Subscriptions {}", subscriptions); + List subscriptions; + try { + subscriptions = pulsarAdmin.topics().getSubscriptions(topic); + log.info("Subscriptions {}", subscriptions); + } catch (PulsarAdminException.NotFoundException notFound) { + log.error("Skipping topic {}", topic); + subscriptions = Collections.emptyList(); + } for (String subscription : subscriptions) { log.info("Found subscription {} ", subscription); if (subscription.equals(name)) { diff --git a/resource-adapter-tests/src/test/java/com/datastax/oss/pulsar/jms/tests/PulsarContainer.java b/resource-adapter-tests/src/test/java/com/datastax/oss/pulsar/jms/tests/PulsarContainer.java index 9c396b6a..fcad2a74 100644 --- a/resource-adapter-tests/src/test/java/com/datastax/oss/pulsar/jms/tests/PulsarContainer.java +++ b/resource-adapter-tests/src/test/java/com/datastax/oss/pulsar/jms/tests/PulsarContainer.java @@ -34,7 +34,7 @@ public PulsarContainer(Network network) { public void start() throws Exception { CountDownLatch pulsarReady = new CountDownLatch(1); pulsarContainer = - new GenericContainer<>("apachepulsar/pulsar:2.8.0") + new GenericContainer<>("apachepulsar/pulsar:2.8.1") .withNetwork(network) .withNetworkAliases("pulsar") .withCommand( diff --git a/resource-adapter/src/license/override-THIRD-PARTY.properties b/resource-adapter/src/license/override-THIRD-PARTY.properties index 3c3a05d7..76b6b647 100644 --- a/resource-adapter/src/license/override-THIRD-PARTY.properties +++ b/resource-adapter/src/license/override-THIRD-PARTY.properties @@ -15,4 +15,4 @@ # net.jcip--jcip-annotations--1.0==Apache License, Version 2.0 -io.netty--netty-tcnative-boringssl-static--2.0.38.Final==Apache License, Version 2.0 +io.netty--netty-tcnative-boringssl-static--2.0.40.Final==Apache License, Version 2.0 diff --git a/tck-executor/start_pulsar.sh b/tck-executor/start_pulsar.sh index 3b6d41b2..462aac86 100755 --- a/tck-executor/start_pulsar.sh +++ b/tck-executor/start_pulsar.sh @@ -2,7 +2,7 @@ set -x -e -IMAGENAME=apachepulsar/pulsar:2.8.0 +IMAGENAME=apachepulsar/pulsar:2.8.1 HERE=$(dirname $0) HERE=$(realpath "$HERE")