diff --git a/pip/pip-347.md b/pip/pip-347.md index 5326fed353374..a5d5d76ae1700 100644 --- a/pip/pip-347.md +++ b/pip/pip-347.md @@ -34,4 +34,4 @@ Fully compatible. Updated afterwards --> * Mailing List discussion thread: https://lists.apache.org/thread/p9y9r8pb7ygk8f0jd121c1121phvzd09 -* Mailing List voting thread: +* Mailing List voting thread: https://lists.apache.org/thread/sfv0vq498dnjx6k6zdrnn0cw8f22tz05 diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index fe9fbe6a4000c..c9f417c4bc4f7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -208,6 +208,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo stats = new ConsumerStatsImpl(); stats.setAddress(cnx.clientSourceAddressAndPort()); stats.consumerName = consumerName; + stats.appId = appId; stats.setConnectedSince(DateFormatter.format(connectedSince)); stats.setClientVersion(cnx.getClientVersion()); stats.metadata = this.metadata; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java new file mode 100644 index 0000000000000..e8cadb72e1e04 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; +import io.jsonwebtoken.Jwts; +import io.jsonwebtoken.SignatureAlgorithm; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.NoSuchAlgorithmException; +import java.security.PrivateKey; +import java.time.Duration; +import java.util.Base64; +import java.util.Date; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Properties; +import java.util.Set; + +public class AuthenticatedConsumerStatsTest extends ConsumerStatsTest{ + private final String ADMIN_TOKEN; + private final String TOKEN_PUBLIC_KEY; + private final KeyPair kp; + + AuthenticatedConsumerStatsTest() throws NoSuchAlgorithmException { + KeyPairGenerator kpg = KeyPairGenerator.getInstance("RSA"); + kp = kpg.generateKeyPair(); + + byte[] encodedPublicKey = kp.getPublic().getEncoded(); + TOKEN_PUBLIC_KEY = "data:;base64," + Base64.getEncoder().encodeToString(encodedPublicKey); + ADMIN_TOKEN = generateToken(kp, "admin"); + } + + + private String generateToken(KeyPair kp, String subject) { + PrivateKey pkey = kp.getPrivate(); + long expMillis = System.currentTimeMillis() + Duration.ofHours(1).toMillis(); + Date exp = new Date(expMillis); + + return Jwts.builder() + .setSubject(subject) + .setExpiration(exp) + .signWith(pkey, SignatureAlgorithm.forSigningKey(pkey)) + .compact(); + } + + @Override + protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) { + clientBuilder.authentication(AuthenticationFactory.token(ADMIN_TOKEN)); + } + + @Override + protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) { + pulsarAdminBuilder.authentication(AuthenticationFactory.token(ADMIN_TOKEN)); + } + + @BeforeMethod + @Override + protected void setup() throws Exception { + conf.setAuthenticationEnabled(true); + conf.setAuthorizationEnabled(true); + + Set superUserRoles = new HashSet<>(); + superUserRoles.add("admin"); + conf.setSuperUserRoles(superUserRoles); + + Set providers = new HashSet<>(); + providers.add(AuthenticationProviderToken.class.getName()); + conf.setAuthenticationProviders(providers); + conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN); + + conf.setClusterName("test"); + + // Set provider domain name + Properties properties = new Properties(); + properties.setProperty("tokenPublicKey", TOKEN_PUBLIC_KEY); + conf.setProperties(properties); + + super.internalSetup(); + super.producerBaseSetup(); + } + + @Test + public void testConsumerStatsOutput() throws Exception { + Set allowedFields = Sets.newHashSet( + "msgRateOut", + "msgThroughputOut", + "bytesOutCounter", + "msgOutCounter", + "messageAckRate", + "msgRateRedeliver", + "chunkedMessageRate", + "consumerName", + "availablePermits", + "unackedMessages", + "avgMessagesPerEntry", + "blockedConsumerOnUnackedMsgs", + "readPositionWhenJoining", + "lastAckedTime", + "lastAckedTimestamp", + "lastConsumedTime", + "lastConsumedTimestamp", + "lastConsumedFlowTimestamp", + "keyHashRanges", + "metadata", + "address", + "connectedSince", + "clientVersion", + "appId"); + + final String topicName = "persistent://public/default/testConsumerStatsOutput"; + final String subName = "my-subscription"; + + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionType(SubscriptionType.Shared) + .subscriptionName(subName) + .subscribe(); + + TopicStats stats = admin.topics().getStats(topicName); + ObjectMapper mapper = ObjectMapperFactory.create(); + ConsumerStats consumerStats = stats.getSubscriptions() + .get(subName).getConsumers().get(0); + Assert.assertTrue(consumerStats.getLastConsumedFlowTimestamp() > 0); + JsonNode node = mapper.readTree(mapper.writer().writeValueAsString(consumerStats)); + Iterator itr = node.fieldNames(); + while (itr.hasNext()) { + String field = itr.next(); + Assert.assertTrue(allowedFields.contains(field), field + " should not be exposed"); + } + // assert that role is exposed + Assert.assertEquals(consumerStats.getAppId(), "admin"); + consumer.close(); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index 512a5cfcab661..024d8582fa213 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -195,7 +195,7 @@ public void testAckStatsOnPartitionedTopicForExclusiveSubscription() throws Puls @Test public void testUpdateStatsForActiveConsumerAndSubscription() throws Exception { - final String topicName = "persistent://prop/use/ns-abc/testUpdateStatsForActiveConsumerAndSubscription"; + final String topicName = "persistent://public/default/testUpdateStatsForActiveConsumerAndSubscription"; pulsarClient.newConsumer() .topic(topicName) .subscriptionType(SubscriptionType.Shared) diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java index 8c9a615d6d01c..d2d3600df96ed 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java @@ -25,6 +25,9 @@ * Consumer statistics. */ public interface ConsumerStats { + /** the app id. */ + String getAppId(); + /** Total rate of messages delivered to the consumer (msg/s). */ double getMsgRateOut(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java index 548abdc9ada33..de36b330b7f1a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java @@ -30,6 +30,9 @@ */ @Data public class ConsumerStatsImpl implements ConsumerStats { + /** the app id. */ + public String appId; + /** Total rate of messages delivered to the consumer (msg/s). */ public double msgRateOut;