diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index 225016cae148c..8f868dc9c9224 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -21,14 +21,17 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed; import static org.slf4j.bridge.SLF4JBridgeHandler.install; import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger; +import com.google.common.annotations.VisibleForTesting; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.websocket.WebSocketConsumerServlet; import org.apache.pulsar.websocket.WebSocketPingPongServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; @@ -82,6 +85,8 @@ public class ProxyServiceStarter { @Parameter(names = { "-h", "--help" }, description = "Show this help message") private boolean help = false; + private ProxyConfiguration config; + public ProxyServiceStarter(String[] args) throws Exception { try { @@ -108,7 +113,7 @@ public ProxyServiceStarter(String[] args) throws Exception { } // load config file - final ProxyConfiguration config = PulsarConfigurationLoader.create(configFile, ProxyConfiguration.class); + config = PulsarConfigurationLoader.create(configFile, ProxyConfiguration.class); if (!isBlank(zookeeperServers)) { // Use zookeeperServers from command line @@ -136,55 +141,58 @@ public ProxyServiceStarter(String[] args) throws Exception { checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided"); } - AuthenticationService authenticationService = new AuthenticationService( - PulsarConfigurationLoader.convertFrom(config)); - // create proxy service - ProxyService proxyService = new ProxyService(config, authenticationService); - // create a web-service - final WebServer server = new WebServer(config, authenticationService); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - proxyService.close(); - server.stop(); - } catch (Exception e) { - log.warn("server couldn't stop gracefully {}", e.getMessage(), e); - } - })); + } catch (Exception e) { + log.error("Failed to start pulsar proxy service. error msg " + e.getMessage(), e); + throw new PulsarServerException(e); + } + } - proxyService.start(); + public static void main(String[] args) throws Exception { + ProxyServiceStarter serviceStarter = new ProxyServiceStarter(args); + serviceStarter.start(); + } - // Setup metrics - DefaultExports.initialize(); + public void start() throws Exception { + AuthenticationService authenticationService = new AuthenticationService( + PulsarConfigurationLoader.convertFrom(config)); + // create proxy service + ProxyService proxyService = new ProxyService(config, authenticationService); + // create a web-service + final WebServer server = new WebServer(config, authenticationService); - // Report direct memory from Netty counters - Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new Child() { - @Override - public double get() { - return getJvmDirectMemoryUsed(); - } - }).register(CollectorRegistry.defaultRegistry); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + proxyService.close(); + server.stop(); + } catch (Exception e) { + log.warn("server couldn't stop gracefully {}", e.getMessage(), e); + } + })); - Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Child() { - @Override - public double get() { - return PlatformDependent.maxDirectMemory(); - } - }).register(CollectorRegistry.defaultRegistry); + proxyService.start(); - addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider()); + // Setup metrics + DefaultExports.initialize(); - // start web-service - server.start(); + // Report direct memory from Netty counters + Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new Child() { + @Override + public double get() { + return getJvmDirectMemoryUsed(); + } + }).register(CollectorRegistry.defaultRegistry); - } catch (Exception e) { - log.error("Failed to start pulsar proxy service. error msg " + e.getMessage(), e); - throw new PulsarServerException(e); - } - } + Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Child() { + @Override + public double get() { + return PlatformDependent.maxDirectMemory(); + } + }).register(CollectorRegistry.defaultRegistry); - public static void main(String[] args) throws Exception { - new ProxyServiceStarter(args); + addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider()); + + // start web-service + server.start(); } public static void addWebServerHandlers(WebServer server, @@ -225,7 +233,7 @@ public static void addWebServerHandlers(WebServer server, if (config.isWebSocketServiceEnabled()) { // add WebSocket servlet // Use local broker address to avoid different IP address when using a VIP for service discovery - WebSocketService webSocketService = new WebSocketService(null, PulsarConfigurationLoader.convertFrom(config)); + WebSocketService webSocketService = new WebSocketService(createClusterData(config), PulsarConfigurationLoader.convertFrom(config)); webSocketService.start(); final WebSocketServlet producerWebSocketServlet = new WebSocketProducerServlet(webSocketService); server.addServlet(WebSocketProducerServlet.SERVLET_PATH, @@ -253,6 +261,29 @@ public static void addWebServerHandlers(WebServer server, } } + private static ClusterData createClusterData(ProxyConfiguration config) { + if (isNotBlank(config.getBrokerServiceURL()) || isNotBlank(config.getBrokerServiceURLTLS())) { + return ClusterData.builder() + .serviceUrl(config.getBrokerWebServiceURL()) + .serviceUrlTls(config.getBrokerWebServiceURLTLS()) + .brokerServiceUrl(config.getBrokerServiceURL()) + .brokerServiceUrlTls(config.getBrokerServiceURLTLS()) + .build(); + } else if (isNotBlank(config.getBrokerWebServiceURL()) || isNotBlank(config.getBrokerWebServiceURLTLS())) { + return ClusterData.builder() + .serviceUrl(config.getBrokerWebServiceURL()) + .serviceUrlTls(config.getBrokerWebServiceURLTLS()) + .build(); + } else { + return null; + } + } + + @VisibleForTesting + public ProxyConfiguration getConfig() { + return config; + } + private static final Logger log = LoggerFactory.getLogger(ProxyServiceStarter.class); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java new file mode 100644 index 0000000000000..63d7e3986cc67 --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + +import lombok.Cleanup; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; + +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.websocket.data.ProducerMessage; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; +import org.eclipse.jetty.websocket.api.WebSocketPingPongListener; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.Base64; +import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Future; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest { + + @Override + @BeforeClass + protected void setup() throws Exception { + internalSetup(); + String[] args = new String[]{"-c", "./src/test/resources/proxy.conf"}; + ProxyServiceStarter serviceStarter = new ProxyServiceStarter(args); + serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl()); + serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress()); + serviceStarter.getConfig().setServicePort(Optional.of(11000)); + serviceStarter.getConfig().setWebSocketServiceEnabled(true); + serviceStarter.start(); + } + + @Override + @AfterClass(alwaysRun = true) + protected void cleanup() throws Exception { + internalCleanup(); + } + + @Test + public void testEnableWebSocketServer() throws Exception { + HttpClient httpClient = new HttpClient(); + WebSocketClient webSocketClient = new WebSocketClient(httpClient); + webSocketClient.start(); + MyWebSocket myWebSocket = new MyWebSocket(); + String webSocketUri = "ws://localhost:8080/ws/pingpong"; + Future sessionFuture = webSocketClient.connect(myWebSocket, URI.create(webSocketUri)); + sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes())); + assertTrue(myWebSocket.getResponse().contains("ping")); + } + + @Test + public void testProducer() throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:11000") + .build(); + + @Cleanup + Producer producer = client.newProducer() + .topic("persistent://sample/test/local/websocket-topic") + .create(); + + for (int i = 0; i < 10; i++) { + producer.send("test".getBytes()); + } + } + + @Test + public void testProduceAndConsumeMessageWithWebsocket() throws Exception { + HttpClient producerClient = new HttpClient(); + WebSocketClient producerWebSocketClient = new WebSocketClient(producerClient); + producerWebSocketClient.start(); + MyWebSocket producerSocket = new MyWebSocket(); + String produceUri = "ws://localhost:8080/ws/producer/persistent/sample/test/local/websocket-topic"; + Future producerSession = producerWebSocketClient.connect(producerSocket, URI.create(produceUri)); + + ProducerMessage produceRequest = new ProducerMessage(); + produceRequest.setContext("context"); + produceRequest.setPayload(Base64.getEncoder().encodeToString("my payload".getBytes())); + + HttpClient consumerClient = new HttpClient(); + WebSocketClient consumerWebSocketClient = new WebSocketClient(consumerClient); + consumerWebSocketClient.start(); + MyWebSocket consumerSocket = new MyWebSocket(); + String consumeUri = "ws://localhost:8080/ws/consumer/persistent/sample/test/local/websocket-topic/my-sub"; + Future consumerSession = consumerWebSocketClient.connect(consumerSocket, URI.create(consumeUri)); + consumerSession.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes())); + producerSession.get().getRemote().sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(produceRequest)); + assertTrue(consumerSocket.getResponse().contains("ping")); + ProducerMessage message = ObjectMapperFactory.getThreadLocal().readValue(consumerSocket.getResponse(), ProducerMessage.class); + assertEquals(new String(Base64.getDecoder().decode(message.getPayload())), "my payload"); + } + + @WebSocket + public static class MyWebSocket extends WebSocketAdapter implements WebSocketPingPongListener { + + ArrayBlockingQueue incomingMessages = new ArrayBlockingQueue<>(10); + + @Override + public void onWebSocketText(String message) { + incomingMessages.add(message); + } + + @Override + public void onWebSocketClose(int i, String s) { + } + + @Override + public void onWebSocketConnect(Session session) { + } + + @Override + public void onWebSocketError(Throwable throwable) { + } + + @Override + public void onWebSocketPing(ByteBuffer payload) { + } + + @Override + public void onWebSocketPong(ByteBuffer payload) { + incomingMessages.add(BufferUtil.toDetailString(payload)); + } + + public String getResponse() throws InterruptedException { + return incomingMessages.take(); + } + } + +} diff --git a/pulsar-proxy/src/test/resources/proxy.conf b/pulsar-proxy/src/test/resources/proxy.conf new file mode 100644 index 0000000000000..b5ed33f03abb6 --- /dev/null +++ b/pulsar-proxy/src/test/resources/proxy.conf @@ -0,0 +1,236 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +### --- Broker Discovery --- ### + +# The ZooKeeper quorum connection string (as a comma-separated list) +zookeeperServers= + +# Configuration store connection string (as a comma-separated list) +configurationStoreServers= + +# if Service Discovery is Disabled this url should point to the discovery service provider. +brokerServiceURL=pulsar://0.0.0.0:0 +brokerServiceURLTLS= + +# These settings are unnecessary if `zookeeperServers` is specified +brokerWebServiceURL=http://0.0.0.0:0 +brokerWebServiceURLTLS= + +# If function workers are setup in a separate cluster, configure the following 2 settings +# to point to the function workers cluster +functionWorkerWebServiceURL= +functionWorkerWebServiceURLTLS= + +# ZooKeeper session timeout (in milliseconds) +zookeeperSessionTimeoutMs=30000 + +# ZooKeeper cache expiry time in seconds +zooKeeperCacheExpirySeconds=300 + +### --- Server --- ### + +# Hostname or IP address the service binds on, default is 0.0.0.0. +bindAddress=0.0.0.0 + +# Hostname or IP address the service advertises to the outside world. +# If not set, the value of `InetAddress.getLocalHost().getHostname()` is used. +advertisedAddress= + +# Enable or disable the HAProxy protocol. +haProxyProtocolEnabled=false + +# The port to use for server binary Protobuf requests +servicePort=6650 + +# The port to use to server binary Protobuf TLS requests +servicePortTls= + +# Port that discovery service listen on +webServicePort=8080 + +# Port to use to server HTTPS request +webServicePortTls= + +# Path for the file used to determine the rotation status for the proxy instance when responding +# to service discovery health checks +statusFilePath= + +# Proxy log level, default is 0. +# 0: Do not log any tcp channel info +# 1: Parse and log any tcp channel info and command info without message body +# 2: Parse and log channel info, command info and message body +proxyLogLevel=0 + +### ---Authorization --- ### + +# Role names that are treated as "super-users," meaning that they will be able to perform all admin +# operations and publish/consume to/from all topics (as a comma-separated list) +superUserRoles= + +# Whether authorization is enforced by the Pulsar proxy +authorizationEnabled=false + +# Authorization provider as a fully qualified class name +authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider + +# Whether client authorization credentials are forwared to the broker for re-authorization. +# Authentication must be enabled via authenticationEnabled=true for this to take effect. +forwardAuthorizationCredentials=false + +### --- Authentication --- ### + +# Whether authentication is enabled for the Pulsar proxy +authenticationEnabled=false + +# Authentication provider name list (a comma-separated list of class names) +authenticationProviders= + +# When this parameter is not empty, unauthenticated users perform as anonymousUserRole +anonymousUserRole= + +### --- Client Authentication --- ### + +# The three brokerClient* authentication settings below are for the proxy itself and determine how it +# authenticates with Pulsar brokers + +# The authentication plugin used by the Pulsar proxy to authenticate with Pulsar brokers +brokerClientAuthenticationPlugin= + +# The authentication parameters used by the Pulsar proxy to authenticate with Pulsar brokers +brokerClientAuthenticationParameters= + +# The path to trusted certificates used by the Pulsar proxy to authenticate with Pulsar brokers +brokerClientTrustCertsFilePath= + +# Whether TLS is enabled when communicating with Pulsar brokers +tlsEnabledWithBroker=false + +# Tls cert refresh duration in seconds (set 0 to check on every new connection) +tlsCertRefreshCheckDurationSec=300 + +##### --- Rate Limiting --- ##### + +# Max concurrent inbound connections. The proxy will reject requests beyond that. +maxConcurrentInboundConnections=10000 + +# Max concurrent outbound connections. The proxy will error out requests beyond that. +maxConcurrentLookupRequests=50000 + +##### --- TLS --- ##### + +# Deprecated - use servicePortTls and webServicePortTls instead +tlsEnabledInProxy=false + +# Path for the TLS certificate file +tlsCertificateFilePath= + +# Path for the TLS private key file +tlsKeyFilePath= + +# Path for the trusted TLS certificate file. +# This cert is used to verify that any certs presented by connecting clients +# are signed by a certificate authority. If this verification +# fails, then the certs are untrusted and the connections are dropped. +tlsTrustCertsFilePath= + +# Accept untrusted TLS certificate from client. +# If true, a client with a cert which cannot be verified with the +# 'tlsTrustCertsFilePath' cert will allowed to connect to the server, +# though the cert will not be used for client authentication. +tlsAllowInsecureConnection=false + +# Whether the hostname is validated when the proxy creates a TLS connection with brokers +tlsHostnameVerificationEnabled=false + +# Specify the tls protocols the broker will use to negotiate during TLS handshake +# (a comma-separated list of protocol names). +# Examples:- [TLSv1.3, TLSv1.2] +tlsProtocols= + +# Specify the tls cipher the broker will use to negotiate during TLS Handshake +# (a comma-separated list of ciphers). +# Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256] +tlsCiphers= + +# Whether client certificates are required for TLS. Connections are rejected if the client +# certificate isn't trusted. +tlsRequireTrustedClientCertOnConnect=false + +##### --- HTTP --- ##### + +# Http directs to redirect to non-pulsar services. +httpReverseProxyConfigs= + +# Http output buffer size. The amount of data that will be buffered for http requests +# before it is flushed to the channel. A larger buffer size may result in higher http throughput +# though it may take longer for the client to see data. +# If using HTTP streaming via the reverse proxy, this should be set to the minimum value, 1, +# so that clients see the data as soon as possible. +httpOutputBufferSize=32768 + +# Number of threads to use for HTTP requests processing. Default is +# 2 * Runtime.getRuntime().availableProcessors() +httpNumThreads= + +# Enable the enforcement of limits on the incoming HTTP requests +httpRequestsLimitEnabled=false + +# Max HTTP requests per seconds allowed. The excess of requests will be rejected with HTTP code 429 (Too many requests) +httpRequestsMaxPerSecond=100.0 + + +### --- Token Authentication Provider --- ### + +## Symmetric key +# Configure the secret key to be used to validate auth tokens +# The key can be specified like: +# tokenSecretKey=data:;base64,xxxxxxxxx +# tokenSecretKey=file:///my/secret.key ( Note: key file must be DER-encoded ) +tokenSecretKey= + +## Asymmetric public/private key pair +# Configure the public key to be used to validate auth tokens +# The key can be specified like: +# tokenPublicKey=data:;base64,xxxxxxxxx +# tokenPublicKey=file:///my/public.key ( Note: key file must be DER-encoded ) +tokenPublicKey= + +# The token "claim" that will be interpreted as the authentication "role" or "principal" by AuthenticationProviderToken (defaults to "sub" if blank) +tokenAuthClaim= + +# The token audience "claim" name, e.g. "aud", that will be used to get the audience from token. +# If not set, audience will not be verified. +tokenAudienceClaim= + +# The token audience stands for this broker. The field `tokenAudienceClaim` of a valid token, need contains this. +tokenAudience= + +### --- WebSocket config variables --- ### + +# Enable or disable the WebSocket servlet. +webSocketServiceEnabled=false + +# Name of the cluster to which this broker belongs to +clusterName= + +### --- Deprecated config variables --- ### + +# Deprecated. Use configurationStoreServers +globalZookeeperServers=