diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 04516c0fb107b..77292f8207f11 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -39,6 +39,7 @@ import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Modifier; @@ -72,6 +73,7 @@ import lombok.Cleanup; import lombok.Data; import lombok.EqualsAndHashCode; +import org.apache.avro.Schema.Parser; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.bookkeeper.mledger.impl.EntryCacheImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; @@ -88,6 +90,7 @@ import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; +import org.apache.pulsar.client.impl.schema.writer.AvroWriter; import org.apache.pulsar.common.api.EncryptionContext; import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey; import org.apache.pulsar.common.api.proto.MessageMetadata; @@ -3997,6 +4000,49 @@ public void testSendCompressedWithDeferredSchemaSetup(boolean enableBatching) th assertEquals(1, res.getFields().size()); } + @Test(dataProvider = "enableBatching") + public void testNativeAvroSendCompressedWithDeferredSchemaSetup(boolean enableBatching) throws Exception { + log.info("-- Starting {} test --", methodName); + + final String topic = "persistent://my-property/my-ns/deferredSchemaCompressed"; + Consumer consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME()) + .topic(topic) + .subscriptionName("testsub") + .subscribe(); + + // initially we are not setting a Schema in the producer + Producer producer = pulsarClient.newProducer().topic(topic) + .enableBatching(enableBatching) + .compressionType(CompressionType.LZ4) + .create(); + MyBean payload = new MyBean(); + payload.setField("aaaaaaaaaaaaaaaaaaaaaaaaa"); + + // now we send with a schema, but we have enabled compression and batching + // the producer will have to setup the schema and resume the send + Schema myBeanSchema = Schema.AVRO(MyBean.class); + byte[] schemaBytes = myBeanSchema.getSchemaInfo().getSchema(); + org.apache.avro.Schema schemaAvroNative = new Parser().parse(new ByteArrayInputStream(schemaBytes)); + AvroWriter writer = new AvroWriter<>(schemaAvroNative); + byte[] content = writer.write(payload); + producer.newMessage(Schema.NATIVE_AVRO(schemaAvroNative)).value(content).send(); + producer.close(); + + GenericRecord res = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getValue(); + consumer.close(); + assertEquals(SchemaType.AVRO, res.getSchemaType()); + org.apache.avro.generic.GenericRecord nativeRecord = (org.apache.avro.generic.GenericRecord) res.getNativeObject(); + org.apache.avro.Schema schema = nativeRecord.getSchema(); + for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) { + log.info("field {} {}", f.getName(), res.getField(f)); + assertEquals("field", f.getName()); + assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaa", res.getField(f)); + assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaa", nativeRecord.get(f.getName()).toString()); + } + + assertEquals(1, res.getFields().size()); + } + @DataProvider(name = "avroSchemaProvider") public static Object[] avroSchemaProvider() { return new Object[]{Schema.AVRO(MyBean.class), Schema.JSON(MyBean.class)}; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java index 70c40872dc573..fd8036eaf9e3a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java @@ -46,6 +46,7 @@ import org.apache.pulsar.client.impl.HttpLookupService; import org.apache.pulsar.client.impl.LookupService; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.schema.reader.AvroReader; import org.apache.pulsar.client.impl.schema.writer.AvroWriter; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.schema.SchemaVersion; @@ -242,6 +243,17 @@ public void newProducerWithoutSchemaOnTopicWithSchema() throws Exception { } catch (PulsarClientException e) { assertTrue(e.getCause() instanceof SchemaSerializationException); } + + Schema v1Schema = Schema.AVRO(V1Data.class); + byte[] v1SchemaBytes = v1Schema.getSchemaInfo().getSchema(); + org.apache.avro.Schema v1SchemaAvroNative = new Parser().parse(new ByteArrayInputStream(v1SchemaBytes)); + // if using NATIVE_AVRO, producer can connect but the publish will fail + try (Producer p = pulsarClient.newProducer(Schema.NATIVE_AVRO(v1SchemaAvroNative)).topic(topic).create()) { + p.send("junkdata".getBytes(UTF_8)); + } catch (PulsarClientException e) { + assertTrue(e.getCause() instanceof SchemaSerializationException); + } + } @Test @@ -305,6 +317,67 @@ public void newProducerForMessageSchemaOnTopicWithMultiVersionSchema() throws Ex } } + @Test + public void newNativeAvroProducerForMessageSchemaOnTopicWithMultiVersionSchema() throws Exception { + String topic = "my-property/my-ns/schema-test"; + Schema v1Schema = Schema.AVRO(V1Data.class); + byte[] v1SchemaBytes = v1Schema.getSchemaInfo().getSchema(); + org.apache.avro.Schema v1SchemaAvroNative = new Parser().parse(new ByteArrayInputStream(v1SchemaBytes)); + AvroWriter v1Writer = new AvroWriter<>(v1SchemaAvroNative); + Schema v2Schema = Schema.AVRO(V2Data.class); + byte[] v2SchemaBytes = v2Schema.getSchemaInfo().getSchema(); + org.apache.avro.Schema v2SchemaAvroNative = new Parser().parse(new ByteArrayInputStream(v2SchemaBytes)); + AvroWriter v2Writer = new AvroWriter<>(v2SchemaAvroNative); + V1Data dataV1 = new V1Data(2); + V2Data dataV2 = new V2Data(3, 5); + byte[] contentV1 = v1Writer.write(dataV1); + byte[] contentV2 = v2Writer.write(dataV2); + try (Producer ignored = pulsarClient.newProducer(Schema.NATIVE_AVRO(v1SchemaAvroNative)) + .topic(topic).create()) { + } + try (Producer p = pulsarClient.newProducer(Schema.NATIVE_AVRO(v2SchemaAvroNative)) + .topic(topic).create()) { + p.send(contentV2); + } + try (Producer p = pulsarClient.newProducer(Schema.NATIVE_AVRO(v1SchemaAvroNative)) + .topic(topic).create(); + Consumer c = pulsarClient.newConsumer(v2Schema) + .topic(topic) + .subscriptionName("sub1").subscribe()) { + + p.newMessage(Schema.NATIVE_AVRO(v1SchemaAvroNative)) + .value(contentV1).send(); + p.newMessage(Schema.NATIVE_AVRO(v2SchemaAvroNative)) + .value(contentV2).send(); + Message msg1 = c.receive(); + V2Data msg1Value = msg1.getValue(); + Assert.assertEquals(dataV1.i, msg1Value.i); + assertNull(msg1Value.j); + Assert.assertEquals(msg1.getSchemaVersion(), new LongSchemaVersion(0).bytes()); + + Message msg2 = c.receive(); + Assert.assertEquals(dataV2, msg2.getValue()); + Assert.assertEquals(msg2.getSchemaVersion(), new LongSchemaVersion(1).bytes()); + + try { + p.newMessage(Schema.BYTES).value(contentV1).send(); + if (schemaValidationEnforced) { + Assert.fail("Shouldn't be able to send to a schema'd topic with no schema" + + " if SchemaValidationEnabled is enabled"); + } + Message msg3 = c.receive(); + Assert.assertEquals(msg3.getSchemaVersion(), SchemaVersion.Empty.bytes()); + } catch (PulsarClientException e) { + if (schemaValidationEnforced) { + Assert.assertTrue(e instanceof IncompatibleSchemaException); + } else { + Assert.fail("Shouldn't throw IncompatibleSchemaException" + + " if SchemaValidationEnforced is disabled"); + } + } + } + } + @Test public void newProducerForMessageOnTopicWithDifferentSchemaType() throws Exception { String topic = "my-property/my-ns/schema-test"; @@ -374,6 +447,44 @@ public void newProducerForMessageSchemaOnTopicInitialWithNoSchema() throws Excep v2Schema.getSchemaInfo())); } + @Test + public void newNativeAvroProducerForMessageSchemaOnTopicInitialWithNoSchema() throws Exception { + String topic = "my-property/my-ns/schema-test"; + Schema v1Schema = Schema.AVRO(V1Data.class); + byte[] v1SchemaBytes = v1Schema.getSchemaInfo().getSchema(); + org.apache.avro.Schema v1SchemaAvroNative = new Parser().parse(new ByteArrayInputStream(v1SchemaBytes)); + AvroWriter v1Writer = new AvroWriter<>(v1SchemaAvroNative); + Schema v2Schema = Schema.AVRO(V2Data.class); + byte[] v2SchemaBytes = v2Schema.getSchemaInfo().getSchema(); + org.apache.avro.Schema v2SchemaAvroNative = new Parser().parse(new ByteArrayInputStream(v2SchemaBytes)); + AvroWriter v2Writer = new AvroWriter<>(v2SchemaAvroNative); + + try (Producer p = pulsarClient.newProducer() + .topic(topic).create(); + Consumer c = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("sub1").subscribe()) { + for (int i = 0; i < 2; ++i) { + V1Data dataV1 = new V1Data(i); + V2Data dataV2 = new V2Data(i, -i); + byte[] contentV1 = v1Writer.write(dataV1); + byte[] contentV2 = v2Writer.write(dataV2); + p.newMessage(Schema.NATIVE_AVRO(v1SchemaAvroNative)).value(contentV1).send(); + Message msg1 = c.receive(); + Assert.assertEquals(msg1.getSchemaVersion(), new LongSchemaVersion(0).bytes()); + Assert.assertEquals(msg1.getData(), contentV1); + p.newMessage(Schema.NATIVE_AVRO(v2SchemaAvroNative)).value(contentV2).send(); + Message msg2 = c.receive(); + Assert.assertEquals(msg2.getSchemaVersion(), new LongSchemaVersion(1).bytes()); + Assert.assertEquals(msg2.getData(), contentV2); + } + } + + List allSchemas = admin.schemas().getAllSchemas(topic); + Assert.assertEquals(allSchemas, Arrays.asList(v1Schema.getSchemaInfo(), + v2Schema.getSchemaInfo())); + } + @Test public void newProducerForMessageSchemaWithBatch() throws Exception { String topic = "my-property/my-ns/schema-test"; @@ -426,6 +537,75 @@ public void newProducerForMessageSchemaWithBatch() throws Exception { c.close(); } + + @Test + public void newNativeAvroProducerForMessageSchemaWithBatch() throws Exception { + String topic = "my-property/my-ns/schema-test"; + Schema v1Schema = Schema.AVRO(V1Data.class); + byte[] v1SchemaBytes = v1Schema.getSchemaInfo().getSchema(); + org.apache.avro.Schema v1SchemaAvroNative = new Parser().parse(new ByteArrayInputStream(v1SchemaBytes)); + AvroWriter v1Writer = new AvroWriter<>(v1SchemaAvroNative); + Schema v2Schema = Schema.AVRO(V2Data.class); + byte[] v2SchemaBytes = v2Schema.getSchemaInfo().getSchema(); + org.apache.avro.Schema v2SchemaAvroNative = new Parser().parse(new ByteArrayInputStream(v2SchemaBytes)); + AvroWriter v2Writer = new AvroWriter<>(v2SchemaAvroNative); + + Consumer c = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("sub1").subscribe(); + Producer p = pulsarClient.newProducer(Schema.NATIVE_AVRO(v1SchemaAvroNative)) + .topic(topic) + .enableBatching(true) + .batchingMaxPublishDelay(10, TimeUnit.SECONDS).create(); + AvroWriter v1DataAvroWriter = new AvroWriter<>( + ReflectData.AllowNull.get().getSchema(V1Data.class)); + AvroWriter v2DataAvroWriter = new AvroWriter<>( + ReflectData.AllowNull.get().getSchema(V2Data.class)); + AvroWriter incompatibleDataAvroWriter = new AvroWriter<>( + ReflectData.AllowNull.get().getSchema(IncompatibleData.class)); + int total = 20; + int batch = 5; + int incompatible = 3; + for (int i = 0; i < total; ++i) { + if (i / batch % 2 == 0) { + byte[] content = v1DataAvroWriter.write(new V1Data(i)); + p.newMessage(Schema.NATIVE_AVRO(v1SchemaAvroNative)) + .value(content).sendAsync(); + } else { + byte[] content = v2DataAvroWriter.write(new V2Data(i, i + total)); + p.newMessage(Schema.NATIVE_AVRO(v2SchemaAvroNative)) + .value(content).sendAsync(); + } + if ((i + 1) % incompatible == 0) { + Schema incompatibleSchema = Schema.AVRO(IncompatibleData.class); + byte[] incompatibleSchemaBytes = incompatibleSchema.getSchemaInfo().getSchema(); + org.apache.avro.Schema incompatibleSchemaAvroNative = new Parser().parse(new ByteArrayInputStream(incompatibleSchemaBytes)); + byte[] content = incompatibleDataAvroWriter.write(new IncompatibleData(-i, -i)); + try { + p.newMessage(Schema.NATIVE_AVRO(incompatibleSchemaAvroNative)) + .value(content).send(); + } catch (Exception e) { + Assert.assertTrue(e instanceof IncompatibleSchemaException, e.getMessage()); + } + } + } + p.flush(); + + for (int i = 0; i < total; ++i) { + byte[] raw = c.receive().getData(); + if (i / batch % 2 == 0) { + AvroReader reader = new AvroReader<>(v1SchemaAvroNative); + V1Data value = reader.read(raw); + Assert.assertEquals(value.i, i); + } else { + AvroReader reader = new AvroReader<>(v2SchemaAvroNative); + V2Data value = reader.read(raw); + Assert.assertEquals(value, new V2Data(i, i + total)); + } + } + c.close(); + } + @Test public void newProducerWithMultipleSchemaDisabled() throws Exception { String topic = "my-property/my-ns/schema-test"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index dec50dcdfdb76..dab9d08ac143e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -29,9 +29,12 @@ import static org.testng.Assert.fail; import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals; +import org.apache.avro.Schema.Parser; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Sets; +import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; @@ -63,6 +66,7 @@ import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord; +import org.apache.pulsar.client.impl.schema.writer.AvroWriter; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; @@ -744,6 +748,13 @@ public void testProducerMultipleSchemaMessages() throws Exception { producer.newMessage(Schema.BYTES).value("test".getBytes(StandardCharsets.UTF_8)).send(); producer.newMessage(Schema.BYTES).value("test".getBytes(StandardCharsets.UTF_8)).send(); producer.newMessage(Schema.BOOL).value(true).send(); + + Schema personThreeSchema = Schema.AVRO(Schemas.PersonThree.class); + byte[] personThreeSchemaBytes = personThreeSchema.getSchemaInfo().getSchema(); + org.apache.avro.Schema personThreeSchemaAvroNative = new Parser().parse(new ByteArrayInputStream(personThreeSchemaBytes)); + AvroWriter writer = new AvroWriter<>(personThreeSchemaAvroNative); + byte[] content = writer.write(new Schemas.PersonThree(0, "ran")); + producer.newMessage(Schema.NATIVE_AVRO(personThreeSchemaAvroNative)).value(content).send(); List allSchemas = admin.schemas().getAllSchemas(topic); Assert.assertEquals(allSchemas.size(), 5); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java index 69fcb00ee2a93..6cf9ccdc794be 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -426,6 +426,18 @@ static Schema AUTO_PRODUCE_BYTES(Schema schema) { return DefaultImplementation.newAutoProduceSchema(schema); } + /** + * Create a schema instance that accepts a serialized Avro payload + * without validating it against the schema specified. + * It can be useful when migrating data from existing event or message stores. + * + * @return the auto schema instance + * @since 2.9.0 + */ + static Schema NATIVE_AVRO(Object schema) { + return DefaultImplementation.newAutoProduceValidatedAvroSchema(schema); + } + // CHECKSTYLE.ON: MethodName static Schema getSchema(SchemaInfo schemaInfo) { diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java index c0fa4d6743a4a..a6020d67c567d 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java @@ -298,6 +298,13 @@ public static Schema newAutoProduceSchema(Schema schema) { .newInstance(schema)); } + public static Schema newAutoProduceValidatedAvroSchema(Object schema) { + return catchExceptions( + () -> (Schema) getConstructor( + "org.apache.pulsar.client.impl.schema.NativeAvroBytesSchema", Object.class) + .newInstance(schema)); + } + public static Schema> newKeyValueBytesSchema() { return catchExceptions( () -> (Schema>) getStaticMethod( diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/NativeAvroBytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/NativeAvroBytesSchema.java new file mode 100644 index 0000000000000..4ee8682258fc0 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/NativeAvroBytesSchema.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import static com.google.common.base.Preconditions.checkState; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +import java.util.Optional; + +/** + * Schema from a native Apache Avro schema. + * This class is supposed to be used on the producer side for working with existing data serialized in Avro, + * possibly stored in another system like Kafka. + * For this reason, it will not perform bytes validation against the schema in encoding and decoding, + * which are just identify functions. + * This class also makes it possible for users to bring in their own Avro serialization method. + */ +public class NativeAvroBytesSchema implements Schema { + + private Schema schema; + private org.apache.avro.Schema nativeSchema; + + public NativeAvroBytesSchema(org.apache.avro.Schema schema) { + setSchema(schema); + } + + public NativeAvroBytesSchema(Object schema) { + this(validateSchema(schema)); + } + + public void setSchema(org.apache.avro.Schema schema) { + SchemaDefinition schemaDefinition = SchemaDefinition.builder().withJsonDef(schema.toString(false)).build(); + this.nativeSchema = schema; + this.schema = AvroSchema.of(schemaDefinition); + } + + public boolean schemaInitialized() { + return schema != null; + } + + private static org.apache.avro.Schema validateSchema (Object schema) { + if (! (schema instanceof org.apache.avro.Schema)) + throw new IllegalArgumentException("The input schema is not of type 'org.apache.avro.Schema'."); + return (org.apache.avro.Schema) schema; + } + + private void ensureSchemaInitialized() { + checkState(schemaInitialized(), "Schema is not initialized before used"); + } + + @Override + public byte[] encode(byte[] message) { + ensureSchemaInitialized(); + + return message; + } + + /* decode should not be used because this is a Schema to be used on the Producer side */ + @Override + public byte[] decode(byte[] bytes, byte[] schemaVersion) { + throw new UnsupportedOperationException(); + } + + @Override + public SchemaInfo getSchemaInfo() { + ensureSchemaInitialized(); + + return schema.getSchemaInfo(); + } + + @Override + public Optional getNativeSchema() { + return Optional.of(this.nativeSchema); + } + + @Override + public Schema clone() { + return new NativeAvroBytesSchema(nativeSchema); + } + +}