Skip to content

Commit

Permalink
Websocket should pass the encryption context to the consumers (#12539)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Oct 30, 2021
1 parent c1e7f94 commit 27e121f
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -54,9 +56,11 @@

import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.TopicType;
Expand Down Expand Up @@ -935,6 +939,64 @@ public void ackBatchMessageTest() throws Exception {
}
}

@Test(timeOut = 20000)
public void consumeEncryptedMessages() throws Exception {
final String subscription = "my-sub";
final String topic = "my-property/my-ns/encrypted" + UUID.randomUUID();
final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
"/ws/v2/consumer/persistent/" + topic + "/" + subscription + "?cryptoFailureAction=CONSUME";
final int messages = 10;

WebSocketClient consumerClient = new WebSocketClient();
SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();


final String rsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0aHB2blhMdkNtRzRNKzZ4dFl0RCtucGNWUFp3MWkxUjkwZk1zCjdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG5OS2dXN2M3SUJNclAwQUVtMzdIVHUwTFNPalAyT0hYbHZ2bFEKR1FJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg==";
final String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQrbnBjVlBadzFpMVI5MGZNczdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG4KTktnVzdjN0lCTXJQMEFFbTM3SFR1MExTT2pQMk9IWGx2dmxRR1FJREFRQUJBb0lCQUFhSkZBaTJDN3UzY05yZgpBc3RZOXZWRExvTEl2SEZabGtCa3RqS1pEWW1WSXNSYitoU0NWaXdWVXJXTEw2N1I2K0l2NGVnNERlVE9BeDAwCjhwbmNYS2daVHcyd0liMS9RalIvWS9SamxhQzhsa2RtUldsaTd1ZE1RQ1pWc3lodVNqVzZQajd2cjhZRTR3b2oKRmhOaWp4RUdjZjl3V3JtTUpyemRuVFdRaVhCeW8rZVR2VVE5QlBnUEdyUmpzTVptVGtMeUFWSmZmMkRmeE81YgpJV0ZEWURKY3lZQU1DSU1RdTd2eXMvSTUwb3U2aWxiMUNPNlFNNlo3S3BQZU9vVkZQd3R6Ymg4Y2Y5eE04VU5TCmo2Si9KbWRXaGdJMzRHUzNOQTY4eFRRNlBWN3pqbmhDYytpY2NtM0pLeXpHWHdhQXBBWitFb2NlLzlqNFdLbXUKNUI0emlSMENnWUVBM2wvOU9IYmwxem15VityUnhXT0lqL2kyclR2SHp3Qm5iblBKeXVlbUw1Vk1GZHBHb2RRMwp2d0h2eVFtY0VDUlZSeG1Yb2pRNFF1UFBIczNxcDZ3RUVGUENXeENoTFNUeGxVYzg1U09GSFdVMk85OWpWN3pJCjcrSk9wREsvTXN0c3g5bkhnWGR1SkYrZ2xURnRBM0xIOE9xeWx6dTJhRlBzcHJ3S3VaZjk0UThDZ1lFQXovWngKYWtFRytQRU10UDVZUzI4Y1g1WGZqc0lYL1YyNkZzNi9zSDE2UWpVSUVkZEU1VDRmQ3Vva3hDalNpd1VjV2htbApwSEVKNVM1eHAzVllSZklTVzNqUlczcXN0SUgxdHBaaXBCNitTMHpUdUptTEpiQTNJaVdFZzJydE10N1gxdUp2CkEvYllPcWUwaE9QVHVYdVpkdFZaMG5NVEtrN0dHOE82VmtCSTdGY0NnWUVBa0RmQ21zY0pnczdKYWhsQldIbVgKekg5cHdlbStTUEtqSWMvNE5CNk4rZGdpa3gyUHAwNWhwUC9WaWhVd1lJdWZ2cy9MTm9nVllOUXJ0SGVwVW5yTgoyK1RtYkhiWmdOU3YxTGR4dDgyVWZCN3kwRnV0S3U2bGhtWEh5TmVjaG8zRmk4c2loMFYwYWlTV21ZdUhmckFICkdhaXNrRVpLbzFpaVp2UVhKSXg5TzJNQ2dZQVRCZjByOWhUWU10eXh0YzZIMy9zZGQwMUM5dGhROGdEeTB5alAKMFRxYzBkTVNKcm9EcW1JV2tvS1lldzkvYmhGQTRMVzVUQ25Xa0NBUGJIbU50RzRmZGZiWXdta0gvaGRuQTJ5MApqS2RscGZwOEdYZVVGQUdIR3gxN0ZBM3NxRnZnS1VoMGVXRWdSSFVMN3ZkUU1WRkJnSlM5M283elFNOTRmTGdQCjZjT0I4d0tCZ0ZjR1Y0R2pJMld3OWNpbGxhQzU1NE12b1NqZjhCLyswNGtYekRPaDhpWUlJek85RVVpbDFqaksKSnZ4cDRobkx6VEtXYnV4M01FV3F1ckxrWWFzNkdwS0JqdytpTk9DYXI2WWRxV0dWcU0zUlV4N1BUVWFad2tLeApVZFA2M0lmWTdpWkNJVC9RYnlIUXZJVWUyTWFpVm5IK3VseGRrSzZZNWU3Z3hjYmNrSUg0Ci0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg==";

Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.defaultCryptoKeyReader(rsaPublicKeyData)
.addEncryptionKey("ws-consumer-a")
.create();

try {
consumerClient.start();
ClientUpgradeRequest consumerRequest = new ClientUpgradeRequest();
Future<Session> consumerFuture = consumerClient.connect(consumeSocket, URI.create(consumerUri), consumerRequest);

assertTrue(consumerFuture.get().isOpen());
assertEquals(consumeSocket.getReceivedMessagesCount(), 0);

for (int i = 0; i < messages; i++) {
producer.sendAsync(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
}

producer.flush();
consumeSocket.sendPermits(messages);
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(consumeSocket.getReceivedMessagesCount(), messages));

for (JsonObject msg : consumeSocket.messages) {
assertTrue(msg.has("encryptionContext"));
JsonObject encryptionCtx = msg.getAsJsonObject("encryptionContext");
JsonObject keys = encryptionCtx.getAsJsonObject("keys");
assertTrue(keys.has("ws-consumer-a"));

assertTrue(keys.getAsJsonObject("ws-consumer-a").has("keyValue"));
}

// The message should not be acked since we only acked 1 message of the batch message
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions()
.get(subscription).getMsgBacklog(), 0));

} finally {
stopWebSocketClient(consumerClient);
}
}

private void verifyTopicStat(Client client, String baseUrl, String topic) {
String statUrl = baseUrl + topic + "/stats";
WebTarget webTarget = client.target(statUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@ public class SimpleConsumerSocket {
private final CountDownLatch closeLatch;
private Session session;
private final ArrayList<String> consumerBuffer;
final ArrayList<JsonObject> messages;
private final AtomicInteger receivedMessages = new AtomicInteger();
// Custom message handler to override standard message processing, if it's needed
private SimpleConsumerMessageHandler customMessageHandler;

public SimpleConsumerSocket() {
this.closeLatch = new CountDownLatch(1);
consumerBuffer = new ArrayList<>();
this.messages = new ArrayList<>();
}

public boolean awaitClose(int duration, TimeUnit unit) throws InterruptedException {
Expand Down Expand Up @@ -79,6 +81,7 @@ public void onConnect(Session session) throws InterruptedException {
public synchronized void onMessage(String msg) throws JsonParseException, IOException {
receivedMessages.incrementAndGet();
JsonObject message = new Gson().fromJson(msg, JsonObject.class);
this.messages.add(message);
if (message.get(X_PULSAR_MESSAGE_ID) != null) {
String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString();
consumerBuffer.add(messageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ private void receiveMessage() {
dm.properties = msg.getProperties();
dm.publishTime = DateFormatter.format(msg.getPublishTime());
dm.redeliveryCount = msg.getRedeliveryCount();
dm.encryptionContext = msg.getEncryptionCtx().orElse(null);
if (msg.getEventTime() != 0) {
dm.eventTime = DateFormatter.format(msg.getEventTime());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import org.apache.pulsar.common.api.EncryptionContext;

@JsonInclude(Include.NON_NULL)
public class ConsumerMessage {
Expand All @@ -32,5 +33,7 @@ public class ConsumerMessage {
public int redeliveryCount;
public String eventTime;

public EncryptionContext encryptionContext;

public String key;
}

0 comments on commit 27e121f

Please sign in to comment.