From dc99f777e0850e954521d2a7947e6c953088bdd1 Mon Sep 17 00:00:00 2001 From: Tboy Date: Wed, 7 Jul 2021 20:32:53 +0800 Subject: [PATCH] 1. Fix websocket tls bug. (#11243) 2. Add ProxyServiceTlsStarterTest. 3. Refactor ProxyServiceStarter to be more easy to test. --- .../proxy/server/ProxyServiceStarter.java | 33 +++- .../proxy/server/ProxyServiceStarterTest.java | 8 +- .../server/ProxyServiceTlsStarterTest.java | 165 ++++++++++++++++++ 3 files changed, 195 insertions(+), 11 deletions(-) create mode 100644 pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index 8f868dc9c9224f..6c151dd09a6a13 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -28,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader; @@ -87,6 +88,10 @@ public class ProxyServiceStarter { private ProxyConfiguration config; + private ProxyService proxyService; + + private WebServer server; + public ProxyServiceStarter(String[] args) throws Exception { try { @@ -156,17 +161,12 @@ public void start() throws Exception { AuthenticationService authenticationService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(config)); // create proxy service - ProxyService proxyService = new ProxyService(config, authenticationService); + proxyService = new ProxyService(config, authenticationService); // create a web-service - final WebServer server = new WebServer(config, authenticationService); + server = new WebServer(config, authenticationService); Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - proxyService.close(); - server.stop(); - } catch (Exception e) { - log.warn("server couldn't stop gracefully {}", e.getMessage(), e); - } + close(); })); proxyService.start(); @@ -195,6 +195,19 @@ public double get() { server.start(); } + public void close() { + try { + if(proxyService != null) { + proxyService.close(); + } + if(server != null) { + server.stop(); + } + } catch (Exception e) { + log.warn("server couldn't stop gracefully {}", e.getMessage(), e); + } + } + public static void addWebServerHandlers(WebServer server, ProxyConfiguration config, ProxyService service, @@ -233,7 +246,9 @@ public static void addWebServerHandlers(WebServer server, if (config.isWebSocketServiceEnabled()) { // add WebSocket servlet // Use local broker address to avoid different IP address when using a VIP for service discovery - WebSocketService webSocketService = new WebSocketService(createClusterData(config), PulsarConfigurationLoader.convertFrom(config)); + ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(config); + serviceConfiguration.setBrokerClientTlsEnabled(config.isTlsEnabledWithBroker()); + WebSocketService webSocketService = new WebSocketService(createClusterData(config), serviceConfiguration); webSocketService.start(); final WebSocketServlet producerWebSocketServlet = new WebSocketProducerServlet(webSocketService); server.addServlet(WebSocketProducerServlet.SERVLET_PATH, diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java index 63d7e3986cc672..3377ec266a7137 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java @@ -48,12 +48,15 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest { + static final String[] ARGS = new String[]{"-c", "./src/test/resources/proxy.conf"}; + + private ProxyServiceStarter serviceStarter; + @Override @BeforeClass protected void setup() throws Exception { internalSetup(); - String[] args = new String[]{"-c", "./src/test/resources/proxy.conf"}; - ProxyServiceStarter serviceStarter = new ProxyServiceStarter(args); + serviceStarter = new ProxyServiceStarter(ARGS); serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl()); serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress()); serviceStarter.getConfig().setServicePort(Optional.of(11000)); @@ -65,6 +68,7 @@ protected void setup() throws Exception { @AfterClass(alwaysRun = true) protected void cleanup() throws Exception { internalCleanup(); + serviceStarter.close(); } @Test diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java new file mode 100644 index 00000000000000..7e6c0f5f25f6c1 --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + +import lombok.Cleanup; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.websocket.data.ProducerMessage; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; +import org.eclipse.jetty.websocket.api.WebSocketPingPongListener; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.Base64; +import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Future; + +import static org.apache.pulsar.proxy.server.ProxyServiceStarterTest.ARGS; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest { + + private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem"; + private final String TLS_PROXY_CERT_FILE_PATH = "./src/test/resources/authentication/tls/server-cert.pem"; + private final String TLS_PROXY_KEY_FILE_PATH = "./src/test/resources/authentication/tls/server-key.pem"; + private ProxyServiceStarter serviceStarter; + + @Override + @BeforeClass + protected void setup() throws Exception { + internalSetup(); + serviceStarter = new ProxyServiceStarter(ARGS); + serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl()); + serviceStarter.getConfig().setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls()); + serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress()); + serviceStarter.getConfig().setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); + serviceStarter.getConfig().setServicePortTls(Optional.of(11043)); + serviceStarter.getConfig().setTlsEnabledWithBroker(true); + serviceStarter.getConfig().setWebSocketServiceEnabled(true); + serviceStarter.getConfig().setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH); + serviceStarter.getConfig().setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH); + serviceStarter.start(); + } + + protected void doInitConf() throws Exception { + super.doInitConf(); + this.conf.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH); + this.conf.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH); + } + + @Override + @AfterClass(alwaysRun = true) + protected void cleanup() throws Exception { + internalCleanup(); + serviceStarter.close(); + } + + @Test + public void testProducer() throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl("pulsar+ssl://localhost:11043") + .allowTlsInsecureConnection(false).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH) + .build(); + + @Cleanup + Producer producer = client.newProducer() + .topic("persistent://sample/test/local/websocket-topic") + .create(); + + for (int i = 0; i < 10; i++) { + producer.send("test".getBytes()); + } + } + + @Test + public void testProduceAndConsumeMessageWithWebsocket() throws Exception { + HttpClient producerClient = new HttpClient(); + WebSocketClient producerWebSocketClient = new WebSocketClient(producerClient); + producerWebSocketClient.start(); + MyWebSocket producerSocket = new MyWebSocket(); + String produceUri = "ws://localhost:8080/ws/producer/persistent/sample/test/local/websocket-topic"; + Future producerSession = producerWebSocketClient.connect(producerSocket, URI.create(produceUri)); + + ProducerMessage produceRequest = new ProducerMessage(); + produceRequest.setContext("context"); + produceRequest.setPayload(Base64.getEncoder().encodeToString("my payload".getBytes())); + + HttpClient consumerClient = new HttpClient(); + WebSocketClient consumerWebSocketClient = new WebSocketClient(consumerClient); + consumerWebSocketClient.start(); + MyWebSocket consumerSocket = new MyWebSocket(); + String consumeUri = "ws://localhost:8080/ws/consumer/persistent/sample/test/local/websocket-topic/my-sub"; + Future consumerSession = consumerWebSocketClient.connect(consumerSocket, URI.create(consumeUri)); + consumerSession.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes())); + producerSession.get().getRemote().sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(produceRequest)); + assertTrue(consumerSocket.getResponse().contains("ping")); + ProducerMessage message = ObjectMapperFactory.getThreadLocal().readValue(consumerSocket.getResponse(), ProducerMessage.class); + assertEquals(new String(Base64.getDecoder().decode(message.getPayload())), "my payload"); + } + + @WebSocket + public static class MyWebSocket extends WebSocketAdapter implements WebSocketPingPongListener { + + ArrayBlockingQueue incomingMessages = new ArrayBlockingQueue<>(10); + + @Override + public void onWebSocketText(String message) { + incomingMessages.add(message); + } + + @Override + public void onWebSocketClose(int i, String s) { + } + + @Override + public void onWebSocketConnect(Session session) { + } + + @Override + public void onWebSocketError(Throwable throwable) { + } + + @Override + public void onWebSocketPing(ByteBuffer payload) { + } + + @Override + public void onWebSocketPong(ByteBuffer payload) { + incomingMessages.add(BufferUtil.toDetailString(payload)); + } + + public String getResponse() throws InterruptedException { + return incomingMessages.take(); + } + } + +}