Skip to content

Commit

Permalink
[Issue 11007] add a version of AUTO_PRODUCE_BYTES that doesn't valida…
Browse files Browse the repository at this point in the history
…te the message in `encode` (#11238)

Fixes #11007

### Motivation

When ingesting event/message data from external systems such as Kafka and Cassandra, the events very often are already serialized with Avro with the schemas also available. In such cases, a Pulsar producer doesn't need to perform the validation step again when sending the events to a topic.

### Modifications

Introduce a new class `AutoProduceValidatedAvroBytesSchema` that ~~extends `AutoProduceBytesSchema`~~ implements `Schema<byte[]>`.

~~TODO: make the `public AutoProduceValidatedAvroBytesSchema(org.apache.avro.Schema schema)` constructor accessible to the client.~~

Add `NATIVE_AVRO` method to `org.apache.pulsar.client.api.Schema` which calls `AutoProduceValidatedAvroBytesSchema`'s constructor via reflection.
  • Loading branch information
Zhen-hao committed Jul 22, 2021
1 parent 4d3fdae commit a78b029
Show file tree
Hide file tree
Showing 6 changed files with 357 additions and 0 deletions.
Expand Up @@ -39,6 +39,7 @@
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
Expand Down Expand Up @@ -72,6 +73,7 @@
import lombok.Cleanup;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.avro.Schema.Parser;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
Expand All @@ -88,6 +90,7 @@
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.proto.MessageMetadata;
Expand Down Expand Up @@ -3997,6 +4000,49 @@ public void testSendCompressedWithDeferredSchemaSetup(boolean enableBatching) th
assertEquals(1, res.getFields().size());
}

@Test(dataProvider = "enableBatching")
public void testNativeAvroSendCompressedWithDeferredSchemaSetup(boolean enableBatching) throws Exception {
log.info("-- Starting {} test --", methodName);

final String topic = "persistent://my-property/my-ns/deferredSchemaCompressed";
Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.topic(topic)
.subscriptionName("testsub")
.subscribe();

// initially we are not setting a Schema in the producer
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
.enableBatching(enableBatching)
.compressionType(CompressionType.LZ4)
.create();
MyBean payload = new MyBean();
payload.setField("aaaaaaaaaaaaaaaaaaaaaaaaa");

// now we send with a schema, but we have enabled compression and batching
// the producer will have to setup the schema and resume the send
Schema<MyBean> myBeanSchema = Schema.AVRO(MyBean.class);
byte[] schemaBytes = myBeanSchema.getSchemaInfo().getSchema();
org.apache.avro.Schema schemaAvroNative = new Parser().parse(new ByteArrayInputStream(schemaBytes));
AvroWriter<MyBean> writer = new AvroWriter<>(schemaAvroNative);
byte[] content = writer.write(payload);
producer.newMessage(Schema.NATIVE_AVRO(schemaAvroNative)).value(content).send();
producer.close();

GenericRecord res = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getValue();
consumer.close();
assertEquals(SchemaType.AVRO, res.getSchemaType());
org.apache.avro.generic.GenericRecord nativeRecord = (org.apache.avro.generic.GenericRecord) res.getNativeObject();
org.apache.avro.Schema schema = nativeRecord.getSchema();
for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) {
log.info("field {} {}", f.getName(), res.getField(f));
assertEquals("field", f.getName());
assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaa", res.getField(f));
assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaa", nativeRecord.get(f.getName()).toString());
}

assertEquals(1, res.getFields().size());
}

@DataProvider(name = "avroSchemaProvider")
public static Object[] avroSchemaProvider() {
return new Object[]{Schema.AVRO(MyBean.class), Schema.JSON(MyBean.class)};
Expand Down
Expand Up @@ -46,6 +46,7 @@
import org.apache.pulsar.client.impl.HttpLookupService;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.reader.AvroReader;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
Expand Down Expand Up @@ -242,6 +243,17 @@ public void newProducerWithoutSchemaOnTopicWithSchema() throws Exception {
} catch (PulsarClientException e) {
assertTrue(e.getCause() instanceof SchemaSerializationException);
}

Schema<V1Data> v1Schema = Schema.AVRO(V1Data.class);
byte[] v1SchemaBytes = v1Schema.getSchemaInfo().getSchema();
org.apache.avro.Schema v1SchemaAvroNative = new Parser().parse(new ByteArrayInputStream(v1SchemaBytes));
// if using NATIVE_AVRO, producer can connect but the publish will fail
try (Producer<byte[]> p = pulsarClient.newProducer(Schema.NATIVE_AVRO(v1SchemaAvroNative)).topic(topic).create()) {
p.send("junkdata".getBytes(UTF_8));
} catch (PulsarClientException e) {
assertTrue(e.getCause() instanceof SchemaSerializationException);
}

}

@Test
Expand Down Expand Up @@ -305,6 +317,67 @@ public void newProducerForMessageSchemaOnTopicWithMultiVersionSchema() throws Ex
}
}

@Test
public void newNativeAvroProducerForMessageSchemaOnTopicWithMultiVersionSchema() throws Exception {
String topic = "my-property/my-ns/schema-test";
Schema<V1Data> v1Schema = Schema.AVRO(V1Data.class);
byte[] v1SchemaBytes = v1Schema.getSchemaInfo().getSchema();
org.apache.avro.Schema v1SchemaAvroNative = new Parser().parse(new ByteArrayInputStream(v1SchemaBytes));
AvroWriter<V1Data> v1Writer = new AvroWriter<>(v1SchemaAvroNative);
Schema<V2Data> v2Schema = Schema.AVRO(V2Data.class);
byte[] v2SchemaBytes = v2Schema.getSchemaInfo().getSchema();
org.apache.avro.Schema v2SchemaAvroNative = new Parser().parse(new ByteArrayInputStream(v2SchemaBytes));
AvroWriter<V2Data> v2Writer = new AvroWriter<>(v2SchemaAvroNative);
V1Data dataV1 = new V1Data(2);
V2Data dataV2 = new V2Data(3, 5);
byte[] contentV1 = v1Writer.write(dataV1);
byte[] contentV2 = v2Writer.write(dataV2);
try (Producer<byte[]> ignored = pulsarClient.newProducer(Schema.NATIVE_AVRO(v1SchemaAvroNative))
.topic(topic).create()) {
}
try (Producer<byte[]> p = pulsarClient.newProducer(Schema.NATIVE_AVRO(v2SchemaAvroNative))
.topic(topic).create()) {
p.send(contentV2);
}
try (Producer<byte[]> p = pulsarClient.newProducer(Schema.NATIVE_AVRO(v1SchemaAvroNative))
.topic(topic).create();
Consumer<V2Data> c = pulsarClient.newConsumer(v2Schema)
.topic(topic)
.subscriptionName("sub1").subscribe()) {

p.newMessage(Schema.NATIVE_AVRO(v1SchemaAvroNative))
.value(contentV1).send();
p.newMessage(Schema.NATIVE_AVRO(v2SchemaAvroNative))
.value(contentV2).send();
Message<V2Data> msg1 = c.receive();
V2Data msg1Value = msg1.getValue();
Assert.assertEquals(dataV1.i, msg1Value.i);
assertNull(msg1Value.j);
Assert.assertEquals(msg1.getSchemaVersion(), new LongSchemaVersion(0).bytes());

Message<V2Data> msg2 = c.receive();
Assert.assertEquals(dataV2, msg2.getValue());
Assert.assertEquals(msg2.getSchemaVersion(), new LongSchemaVersion(1).bytes());

try {
p.newMessage(Schema.BYTES).value(contentV1).send();
if (schemaValidationEnforced) {
Assert.fail("Shouldn't be able to send to a schema'd topic with no schema"
+ " if SchemaValidationEnabled is enabled");
}
Message<V2Data> msg3 = c.receive();
Assert.assertEquals(msg3.getSchemaVersion(), SchemaVersion.Empty.bytes());
} catch (PulsarClientException e) {
if (schemaValidationEnforced) {
Assert.assertTrue(e instanceof IncompatibleSchemaException);
} else {
Assert.fail("Shouldn't throw IncompatibleSchemaException"
+ " if SchemaValidationEnforced is disabled");
}
}
}
}

@Test
public void newProducerForMessageOnTopicWithDifferentSchemaType() throws Exception {
String topic = "my-property/my-ns/schema-test";
Expand Down Expand Up @@ -374,6 +447,44 @@ public void newProducerForMessageSchemaOnTopicInitialWithNoSchema() throws Excep
v2Schema.getSchemaInfo()));
}

@Test
public void newNativeAvroProducerForMessageSchemaOnTopicInitialWithNoSchema() throws Exception {
String topic = "my-property/my-ns/schema-test";
Schema<V1Data> v1Schema = Schema.AVRO(V1Data.class);
byte[] v1SchemaBytes = v1Schema.getSchemaInfo().getSchema();
org.apache.avro.Schema v1SchemaAvroNative = new Parser().parse(new ByteArrayInputStream(v1SchemaBytes));
AvroWriter<V1Data> v1Writer = new AvroWriter<>(v1SchemaAvroNative);
Schema<V2Data> v2Schema = Schema.AVRO(V2Data.class);
byte[] v2SchemaBytes = v2Schema.getSchemaInfo().getSchema();
org.apache.avro.Schema v2SchemaAvroNative = new Parser().parse(new ByteArrayInputStream(v2SchemaBytes));
AvroWriter<V2Data> v2Writer = new AvroWriter<>(v2SchemaAvroNative);

try (Producer<byte[]> p = pulsarClient.newProducer()
.topic(topic).create();
Consumer<byte[]> c = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub1").subscribe()) {
for (int i = 0; i < 2; ++i) {
V1Data dataV1 = new V1Data(i);
V2Data dataV2 = new V2Data(i, -i);
byte[] contentV1 = v1Writer.write(dataV1);
byte[] contentV2 = v2Writer.write(dataV2);
p.newMessage(Schema.NATIVE_AVRO(v1SchemaAvroNative)).value(contentV1).send();
Message<byte[]> msg1 = c.receive();
Assert.assertEquals(msg1.getSchemaVersion(), new LongSchemaVersion(0).bytes());
Assert.assertEquals(msg1.getData(), contentV1);
p.newMessage(Schema.NATIVE_AVRO(v2SchemaAvroNative)).value(contentV2).send();
Message<byte[]> msg2 = c.receive();
Assert.assertEquals(msg2.getSchemaVersion(), new LongSchemaVersion(1).bytes());
Assert.assertEquals(msg2.getData(), contentV2);
}
}

List<SchemaInfo> allSchemas = admin.schemas().getAllSchemas(topic);
Assert.assertEquals(allSchemas, Arrays.asList(v1Schema.getSchemaInfo(),
v2Schema.getSchemaInfo()));
}

@Test
public void newProducerForMessageSchemaWithBatch() throws Exception {
String topic = "my-property/my-ns/schema-test";
Expand Down Expand Up @@ -426,6 +537,75 @@ public void newProducerForMessageSchemaWithBatch() throws Exception {
c.close();
}


@Test
public void newNativeAvroProducerForMessageSchemaWithBatch() throws Exception {
String topic = "my-property/my-ns/schema-test";
Schema<V1Data> v1Schema = Schema.AVRO(V1Data.class);
byte[] v1SchemaBytes = v1Schema.getSchemaInfo().getSchema();
org.apache.avro.Schema v1SchemaAvroNative = new Parser().parse(new ByteArrayInputStream(v1SchemaBytes));
AvroWriter<V1Data> v1Writer = new AvroWriter<>(v1SchemaAvroNative);
Schema<V2Data> v2Schema = Schema.AVRO(V2Data.class);
byte[] v2SchemaBytes = v2Schema.getSchemaInfo().getSchema();
org.apache.avro.Schema v2SchemaAvroNative = new Parser().parse(new ByteArrayInputStream(v2SchemaBytes));
AvroWriter<V2Data> v2Writer = new AvroWriter<>(v2SchemaAvroNative);

Consumer<byte[]> c = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("sub1").subscribe();
Producer<byte[]> p = pulsarClient.newProducer(Schema.NATIVE_AVRO(v1SchemaAvroNative))
.topic(topic)
.enableBatching(true)
.batchingMaxPublishDelay(10, TimeUnit.SECONDS).create();
AvroWriter<V1Data> v1DataAvroWriter = new AvroWriter<>(
ReflectData.AllowNull.get().getSchema(V1Data.class));
AvroWriter<V2Data> v2DataAvroWriter = new AvroWriter<>(
ReflectData.AllowNull.get().getSchema(V2Data.class));
AvroWriter<IncompatibleData> incompatibleDataAvroWriter = new AvroWriter<>(
ReflectData.AllowNull.get().getSchema(IncompatibleData.class));
int total = 20;
int batch = 5;
int incompatible = 3;
for (int i = 0; i < total; ++i) {
if (i / batch % 2 == 0) {
byte[] content = v1DataAvroWriter.write(new V1Data(i));
p.newMessage(Schema.NATIVE_AVRO(v1SchemaAvroNative))
.value(content).sendAsync();
} else {
byte[] content = v2DataAvroWriter.write(new V2Data(i, i + total));
p.newMessage(Schema.NATIVE_AVRO(v2SchemaAvroNative))
.value(content).sendAsync();
}
if ((i + 1) % incompatible == 0) {
Schema<IncompatibleData> incompatibleSchema = Schema.AVRO(IncompatibleData.class);
byte[] incompatibleSchemaBytes = incompatibleSchema.getSchemaInfo().getSchema();
org.apache.avro.Schema incompatibleSchemaAvroNative = new Parser().parse(new ByteArrayInputStream(incompatibleSchemaBytes));
byte[] content = incompatibleDataAvroWriter.write(new IncompatibleData(-i, -i));
try {
p.newMessage(Schema.NATIVE_AVRO(incompatibleSchemaAvroNative))
.value(content).send();
} catch (Exception e) {
Assert.assertTrue(e instanceof IncompatibleSchemaException, e.getMessage());
}
}
}
p.flush();

for (int i = 0; i < total; ++i) {
byte[] raw = c.receive().getData();
if (i / batch % 2 == 0) {
AvroReader<V1Data> reader = new AvroReader<>(v1SchemaAvroNative);
V1Data value = reader.read(raw);
Assert.assertEquals(value.i, i);
} else {
AvroReader<V2Data> reader = new AvroReader<>(v2SchemaAvroNative);
V2Data value = reader.read(raw);
Assert.assertEquals(value, new V2Data(i, i + total));
}
}
c.close();
}

@Test
public void newProducerWithMultipleSchemaDisabled() throws Exception {
String topic = "my-property/my-ns/schema-test";
Expand Down
Expand Up @@ -29,9 +29,12 @@
import static org.testng.Assert.fail;
import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;

import org.apache.avro.Schema.Parser;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Sets;

import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -63,6 +66,7 @@
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -744,6 +748,13 @@ public void testProducerMultipleSchemaMessages() throws Exception {
producer.newMessage(Schema.BYTES).value("test".getBytes(StandardCharsets.UTF_8)).send();
producer.newMessage(Schema.BYTES).value("test".getBytes(StandardCharsets.UTF_8)).send();
producer.newMessage(Schema.BOOL).value(true).send();

Schema<Schemas.PersonThree> personThreeSchema = Schema.AVRO(Schemas.PersonThree.class);
byte[] personThreeSchemaBytes = personThreeSchema.getSchemaInfo().getSchema();
org.apache.avro.Schema personThreeSchemaAvroNative = new Parser().parse(new ByteArrayInputStream(personThreeSchemaBytes));
AvroWriter<Schemas.PersonThree> writer = new AvroWriter<>(personThreeSchemaAvroNative);
byte[] content = writer.write(new Schemas.PersonThree(0, "ran"));
producer.newMessage(Schema.NATIVE_AVRO(personThreeSchemaAvroNative)).value(content).send();

List<SchemaInfo> allSchemas = admin.schemas().getAllSchemas(topic);
Assert.assertEquals(allSchemas.size(), 5);
Expand Down
Expand Up @@ -426,6 +426,18 @@ static Schema<byte[]> AUTO_PRODUCE_BYTES(Schema<?> schema) {
return DefaultImplementation.newAutoProduceSchema(schema);
}

/**
* Create a schema instance that accepts a serialized Avro payload
* without validating it against the schema specified.
* It can be useful when migrating data from existing event or message stores.
*
* @return the auto schema instance
* @since 2.9.0
*/
static Schema<byte[]> NATIVE_AVRO(Object schema) {
return DefaultImplementation.newAutoProduceValidatedAvroSchema(schema);
}

// CHECKSTYLE.ON: MethodName

static Schema<?> getSchema(SchemaInfo schemaInfo) {
Expand Down
Expand Up @@ -298,6 +298,13 @@ public static Schema<byte[]> newAutoProduceSchema(Schema<?> schema) {
.newInstance(schema));
}

public static Schema<byte[]> newAutoProduceValidatedAvroSchema(Object schema) {
return catchExceptions(
() -> (Schema<byte[]>) getConstructor(
"org.apache.pulsar.client.impl.schema.NativeAvroBytesSchema", Object.class)
.newInstance(schema));
}

public static Schema<KeyValue<byte[], byte[]>> newKeyValueBytesSchema() {
return catchExceptions(
() -> (Schema<KeyValue<byte[], byte[]>>) getStaticMethod(
Expand Down

0 comments on commit a78b029

Please sign in to comment.