diff --git a/pulsar-functions/runtime-all/pom.xml b/pulsar-functions/runtime-all/pom.xml index 830c82fca264e..c6fdc3f5747f6 100644 --- a/pulsar-functions/runtime-all/pom.xml +++ b/pulsar-functions/runtime-all/pom.xml @@ -41,6 +41,7 @@ 5. log4j-slf4j-impl 6. log4j-api 7. log4j-core + 8. AVRO --> pulsar-functions-runtime-all @@ -65,6 +66,19 @@ ${project.version} + + + org.apache.avro + avro + ${avro.version} + + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.databind.version} + + diff --git a/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java b/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java index 826f546999c8a..859be4e7aa79e 100644 --- a/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java +++ b/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java @@ -44,6 +44,9 @@ * 5. log4j-slf4j-impl * 6. log4j-api * 7. log4j-core + * 8. Apache AVRO + * 9. Jackson Mapper and Databind (dependency of AVRO) + * 10. Apache Commons Compress (dependency of AVRO) */ public class JavaInstanceDepsTest { @@ -62,6 +65,7 @@ public void testInstanceJarDeps() throws IOException { String name = e.getName(); if (name.endsWith(".class") && !name.startsWith("META-INF") && !name.equals("module-info.class")) { // The only classes in the java-instance.jar should be org.apache.pulsar, slf4j, and log4j classes + // (see the full list above) // filter out those classes to see if there are any other classes that should not be allowed if (!name.startsWith("org/apache/pulsar") && !name.startsWith("org/slf4j") diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java index d131e5b1c1b98..fa8a3fe2211c5 100644 --- a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java +++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.tests.integration.io; +import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; +import org.apache.avro.generic.GenericRecord; import org.apache.pulsar.client.api.schema.GenericObject; import org.apache.pulsar.client.api.schema.KeyValueSchema; import org.apache.pulsar.common.schema.KeyValue; @@ -67,7 +69,7 @@ public void write(Record record) { } log.info("value {}", record.getValue()); log.info("value schema type {}", record.getValue().getSchemaType()); - log.info("value native object {}", record.getValue().getNativeObject()); + log.info("value native object {} class {}", record.getValue().getNativeObject(), record.getValue().getNativeObject().getClass()); String expectedSchemaDefinition = record.getProperties().getOrDefault("expectedSchemaDefinition", ""); log.info("schemaDefinition {}", record.getSchema().getSchemaInfo().getSchemaDefinition()); @@ -79,6 +81,18 @@ public void write(Record record) { } } + // testing that actually the Sink is able to use Native AVRO + if (record.getSchema().getSchemaInfo().getType() == SchemaType.AVRO) { + GenericRecord nativeGenericRecord = (GenericRecord) record.getValue().getNativeObject(); + log.info("Schema from AVRO generic object {}", nativeGenericRecord.getSchema()); + } + + // testing that actually the Sink is able to use Native JSON + if (record.getSchema().getSchemaInfo().getType() == SchemaType.JSON) { + JsonNode nativeGenericRecord = (JsonNode) record.getValue().getNativeObject(); + log.info("NodeType from JsonNode generic object {}", nativeGenericRecord.getNodeType()); + } + record.ack(); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java index a728f5b8b59db..fc266a54dde51 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java @@ -108,6 +108,7 @@ public void testGenericObjectSink() throws Exception { List specs = Arrays.asList( new SinkSpec("test-kv-sink-input-string-" + randomName(8), Schema.STRING, "foo"), new SinkSpec("test-kv-sink-input-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()), + new SinkSpec("test-kv-sink-input-json-" + randomName(8), Schema.JSON(Pojo.class), Pojo.builder().field1("a").field2(2).build()), new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8), Schema.KeyValue(Schema.STRING, Schema.INT32), new KeyValue<>("foo", 123)), new SinkSpec("test-kv-sink-input-kv-avro-json-inl-" + randomName(8),