From 702c8cb79d49ce0f395989b0eb77520028ecfb54 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 20 Jul 2021 05:11:26 +0200 Subject: [PATCH] Pulsar IO: allow Sinks to use native AVRO and JSON (#11322) The Sink should be able to manage the result of GenericRecord.getNativeObject(). In order to do this Apache AVRO must be loaded from the same classloader that is loading Apache Pulsar Runtime. The same problem applies in the case of a JsonNode returned by getNativeObject(); *Modifications* - Add AVRO to the list of classes (like slf4j) to be loaded from the Pulsar runtime (this in turn imports Commons Compress and Jackson Databind). - Enhance the existing integration tests, that tested about the Schema definition, but it didn't actually try to "use" the `org.apache.avro.GenericRecord` object as well as the `Jackson JsonNode` object (cherry picked from commit f35766f99fcc56493be9d12533f4b2a31c3c5884) --- pulsar-functions/runtime-all/pom.xml | 14 ++++++++++++++ .../functions/instance/JavaInstanceDepsTest.java | 9 ++++++++- .../integration/io/TestGenericObjectSink.java | 16 +++++++++++++++- .../io/PulsarGenericObjectSinkTest.java | 1 + 4 files changed, 38 insertions(+), 2 deletions(-) diff --git a/pulsar-functions/runtime-all/pom.xml b/pulsar-functions/runtime-all/pom.xml index c3f1994aaa637..d50b0cb5bf249 100644 --- a/pulsar-functions/runtime-all/pom.xml +++ b/pulsar-functions/runtime-all/pom.xml @@ -41,6 +41,7 @@ 5. log4j-slf4j-impl 6. log4j-api 7. log4j-core + 8. AVRO --> pulsar-functions-runtime-all @@ -65,6 +66,19 @@ ${project.version} + + + org.apache.avro + avro + ${avro.version} + + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.databind.version} + + diff --git a/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java b/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java index 3bdd23f96d98e..859be4e7aa79e 100644 --- a/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java +++ b/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java @@ -44,6 +44,9 @@ * 5. log4j-slf4j-impl * 6. log4j-api * 7. log4j-core + * 8. Apache AVRO + * 9. Jackson Mapper and Databind (dependency of AVRO) + * 10. Apache Commons Compress (dependency of AVRO) */ public class JavaInstanceDepsTest { @@ -60,11 +63,15 @@ public void testInstanceJarDeps() throws IOException { if (e == null) break; String name = e.getName(); - if (name.endsWith(".class") && !name.startsWith("META-INF")) { + if (name.endsWith(".class") && !name.startsWith("META-INF") && !name.equals("module-info.class")) { // The only classes in the java-instance.jar should be org.apache.pulsar, slf4j, and log4j classes + // (see the full list above) // filter out those classes to see if there are any other classes that should not be allowed if (!name.startsWith("org/apache/pulsar") && !name.startsWith("org/slf4j") + && !name.startsWith("org/apache/avro") + && !name.startsWith("com/fasterxml/jackson") + && !name.startsWith("org/apache/commons/compress") && !name.startsWith("org/apache/logging/slf4j") && !name.startsWith("org/apache/logging/log4j")) { notAllowedClasses.add(name); diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java index d131e5b1c1b98..fa8a3fe2211c5 100644 --- a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java +++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.tests.integration.io; +import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; +import org.apache.avro.generic.GenericRecord; import org.apache.pulsar.client.api.schema.GenericObject; import org.apache.pulsar.client.api.schema.KeyValueSchema; import org.apache.pulsar.common.schema.KeyValue; @@ -67,7 +69,7 @@ public void write(Record record) { } log.info("value {}", record.getValue()); log.info("value schema type {}", record.getValue().getSchemaType()); - log.info("value native object {}", record.getValue().getNativeObject()); + log.info("value native object {} class {}", record.getValue().getNativeObject(), record.getValue().getNativeObject().getClass()); String expectedSchemaDefinition = record.getProperties().getOrDefault("expectedSchemaDefinition", ""); log.info("schemaDefinition {}", record.getSchema().getSchemaInfo().getSchemaDefinition()); @@ -79,6 +81,18 @@ public void write(Record record) { } } + // testing that actually the Sink is able to use Native AVRO + if (record.getSchema().getSchemaInfo().getType() == SchemaType.AVRO) { + GenericRecord nativeGenericRecord = (GenericRecord) record.getValue().getNativeObject(); + log.info("Schema from AVRO generic object {}", nativeGenericRecord.getSchema()); + } + + // testing that actually the Sink is able to use Native JSON + if (record.getSchema().getSchemaInfo().getType() == SchemaType.JSON) { + JsonNode nativeGenericRecord = (JsonNode) record.getValue().getNativeObject(); + log.info("NodeType from JsonNode generic object {}", nativeGenericRecord.getNodeType()); + } + record.ack(); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java index a728f5b8b59db..fc266a54dde51 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java @@ -108,6 +108,7 @@ public void testGenericObjectSink() throws Exception { List specs = Arrays.asList( new SinkSpec("test-kv-sink-input-string-" + randomName(8), Schema.STRING, "foo"), new SinkSpec("test-kv-sink-input-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()), + new SinkSpec("test-kv-sink-input-json-" + randomName(8), Schema.JSON(Pojo.class), Pojo.builder().field1("a").field2(2).build()), new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8), Schema.KeyValue(Schema.STRING, Schema.INT32), new KeyValue<>("foo", 123)), new SinkSpec("test-kv-sink-input-kv-avro-json-inl-" + randomName(8),