Skip to content

Commit

Permalink
Update Pulsar client to 2.8.1 and imported third party libs like Netty
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Nov 10, 2021
1 parent 6fc9641 commit 25e60f3
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<jms.version>2.0.3</jms.version>
<pulsar.version>2.8.0</pulsar.version>
<pulsar.version>2.8.1</pulsar.version>
<activemq.version>5.16.1</activemq.version>
<hawtbuf.version>1.11</hawtbuf.version>
<curator.version>5.1.0</curator.version>
Expand Down
2 changes: 1 addition & 1 deletion pulsar-jms-all/src/license/override-THIRD-PARTY.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
#

net.jcip--jcip-annotations--1.0==Apache License, Version 2.0
io.netty--netty-tcnative-boringssl-static--2.0.38.Final==Apache License, Version 2.0
io.netty--netty-tcnative-boringssl-static--2.0.40.Final==Apache License, Version 2.0
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ public void testPulsar272() throws Exception {
}

@Test
public void testPulsar280() throws Exception {
test("apachepulsar/pulsar:2.8.0", false);
public void testPulsar281() throws Exception {
test("apachepulsar/pulsar:2.8.1", false);
}

@Test
public void testPulsar280Transactions() throws Exception {
test("apachepulsar/pulsar:2.8.0", true);
public void testPulsar281Transactions() throws Exception {
test("apachepulsar/pulsar:2.8.1", true);
}

private void test(String image, boolean transactions) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,8 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.*;
import javax.jms.IllegalStateException;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.JMSRuntimeException;
import javax.jms.JMSSecurityException;
import javax.jms.JMSSecurityRuntimeException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -893,6 +880,8 @@ public Consumer<byte[]> createConsumer(
.newConsumer()
// these properties can be overridden by the configuration
.negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
// always set this for Transactions
.isAckReceiptEnabled(sessionMode == Session.SESSION_TRANSACTED)
.loadConf(getConsumerConfiguration())
// these properties cannot be overwritten by the configuration
.subscriptionInitialPosition(initialPosition)
Expand Down Expand Up @@ -973,8 +962,14 @@ public boolean deleteSubscription(PulsarDestination destination, String name)
List<String> allTopics = pulsarAdmin.topics().getList(systemNamespace);
for (String topic : allTopics) {
log.info("Scanning topic {}", topic);
List<String> subscriptions = pulsarAdmin.topics().getSubscriptions(topic);
log.info("Subscriptions {}", subscriptions);
List<String> subscriptions;
try {
subscriptions = pulsarAdmin.topics().getSubscriptions(topic);
log.info("Subscriptions {}", subscriptions);
} catch (PulsarAdminException.NotFoundException notFound) {
log.error("Skipping topic {}", topic);
subscriptions = Collections.emptyList();
}
for (String subscription : subscriptions) {
log.info("Found subscription {} ", subscription);
if (subscription.equals(name)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public PulsarContainer(Network network) {
public void start() throws Exception {
CountDownLatch pulsarReady = new CountDownLatch(1);
pulsarContainer =
new GenericContainer<>("apachepulsar/pulsar:2.8.0")
new GenericContainer<>("apachepulsar/pulsar:2.8.1")
.withNetwork(network)
.withNetworkAliases("pulsar")
.withCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
#

net.jcip--jcip-annotations--1.0==Apache License, Version 2.0
io.netty--netty-tcnative-boringssl-static--2.0.38.Final==Apache License, Version 2.0
io.netty--netty-tcnative-boringssl-static--2.0.40.Final==Apache License, Version 2.0
2 changes: 1 addition & 1 deletion tck-executor/start_pulsar.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

set -x -e

IMAGENAME=apachepulsar/pulsar:2.8.0
IMAGENAME=apachepulsar/pulsar:2.8.1

HERE=$(dirname $0)
HERE=$(realpath "$HERE")
Expand Down

0 comments on commit 25e60f3

Please sign in to comment.