Skip to content

Commit

Permalink
Make KeyValueSchema an interface visible in the public Schema API (#1…
Browse files Browse the repository at this point in the history
…0888)

* Make KeyValueSchema an interface visible in the public Schema API
- allow users of pulsar-client-api to use KeyValueSchema
- move KeyValueSchema implementation to KeyValueSchemaImpl
- introduce a new interface KeyValueSchema
  • Loading branch information
eolivelli committed Jun 10, 2021
1 parent c75d45b commit 18f2f4a
Show file tree
Hide file tree
Showing 23 changed files with 250 additions and 183 deletions.
Expand Up @@ -20,33 +20,23 @@

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaBuilder;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.concurrent.CompletableFuture;

import static org.testng.Assert.assertEquals;

/**
Expand Down Expand Up @@ -83,15 +73,15 @@ public void keyValueAutoConsumeTest() throws Exception {

@Cleanup
Producer<KeyValue<GenericRecord, GenericRecord>> producer = pulsarClient
.newProducer(KeyValueSchema.of(schema, schema))
.newProducer(KeyValueSchemaImpl.of(schema, schema))
.topic(topic)
.create();

producer.newMessage().value(new KeyValue<>(key, value)).send();

@Cleanup
Consumer<KeyValue<GenericRecord, GenericRecord>> consumer = pulsarClient
.newConsumer(KeyValueSchema.of(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME()))
.newConsumer(KeyValueSchemaImpl.of(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME()))
.topic(topic)
.subscriptionName("test")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
Expand Down
Expand Up @@ -30,7 +30,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.testng.Assert;
Expand Down Expand Up @@ -173,14 +173,14 @@ public void keyValueNullInlineTest(String topic, int partitions)

@Cleanup
Producer<KeyValue<String, String>> producer = pulsarClient
.newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING))
.newProducer(KeyValueSchemaImpl.of(Schema.STRING, Schema.STRING))
.topic(topic)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

@Cleanup
Consumer<KeyValue<String, String>> consumer = pulsarClient
.newConsumer(KeyValueSchema.of(Schema.STRING, Schema.STRING))
.newConsumer(KeyValueSchemaImpl.of(Schema.STRING, Schema.STRING))
.topic(topic)
.subscriptionName("test")
.subscribe();
Expand Down Expand Up @@ -220,7 +220,7 @@ public void keyValueNullSeparatedTest(String topic, int partitions)

@Cleanup
Producer<KeyValue<String, String>> producer = pulsarClient
.newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
.newProducer(KeyValueSchemaImpl.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
.topic(topic)
// The default SinglePartition routing mode will be affected by the key when the KeyValueEncodingType is
// SEPARATED so we need to define a message router to guarantee the message order.
Expand All @@ -234,7 +234,7 @@ public int choosePartition(Message<?> msg, TopicMetadata metadata) {

@Cleanup
Consumer<KeyValue<String, String>> consumer = pulsarClient
.newConsumer(KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
.newConsumer(KeyValueSchemaImpl.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
.topic(topic)
.subscriptionName("test")
.subscribe();
Expand Down

0 comments on commit 18f2f4a

Please sign in to comment.