Skip to content

Commit

Permalink
Pulsar IO: allow Sinks to use native AVRO and JSON (apache#11322)
Browse files Browse the repository at this point in the history
The Sink should be able to manage the result of GenericRecord.getNativeObject().
In order to do this Apache AVRO must be loaded from the same classloader that is loading Apache Pulsar Runtime.

The same problem applies in the case of a JsonNode returned by getNativeObject();

*Modifications*
- Add AVRO to the list of classes (like slf4j) to be loaded from the Pulsar runtime (this in turn imports Commons Compress and Jackson Databind).
- Enhance the existing integration tests, that tested about the Schema definition, but it didn't actually try to "use" the `org.apache.avro.GenericRecord` object as well as the `Jackson JsonNode` object

(cherry picked from commit f35766f)
  • Loading branch information
eolivelli committed Apr 3, 2022
1 parent 55093ee commit 94780b2
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 1 deletion.
14 changes: 14 additions & 0 deletions pulsar-functions/runtime-all/pom.xml
Expand Up @@ -41,6 +41,7 @@
5. log4j-slf4j-impl
6. log4j-api
7. log4j-core
8. AVRO
-->

<artifactId>pulsar-functions-runtime-all</artifactId>
Expand All @@ -65,6 +66,19 @@
<version>${project.version}</version>
</dependency>

<!-- avro and its dependencies, with pinned version -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.databind.version}</version>
</dependency>

<!-- logging -->

<dependency>
Expand Down
Expand Up @@ -44,6 +44,9 @@
* 5. log4j-slf4j-impl
* 6. log4j-api
* 7. log4j-core
* 8. Apache AVRO
* 9. Jackson Mapper and Databind (dependency of AVRO)
* 10. Apache Commons Compress (dependency of AVRO)
*/
public class JavaInstanceDepsTest {

Expand All @@ -62,6 +65,7 @@ public void testInstanceJarDeps() throws IOException {
String name = e.getName();
if (name.endsWith(".class") && !name.startsWith("META-INF") && !name.equals("module-info.class")) {
// The only classes in the java-instance.jar should be org.apache.pulsar, slf4j, and log4j classes
// (see the full list above)
// filter out those classes to see if there are any other classes that should not be allowed
if (!name.startsWith("org/apache/pulsar")
&& !name.startsWith("org/slf4j")
Expand Down
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.pulsar.tests.integration.io;

import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValue;
Expand Down Expand Up @@ -67,7 +69,7 @@ public void write(Record<GenericObject> record) {
}
log.info("value {}", record.getValue());
log.info("value schema type {}", record.getValue().getSchemaType());
log.info("value native object {}", record.getValue().getNativeObject());
log.info("value native object {} class {}", record.getValue().getNativeObject(), record.getValue().getNativeObject().getClass());

String expectedSchemaDefinition = record.getProperties().getOrDefault("expectedSchemaDefinition", "");
log.info("schemaDefinition {}", record.getSchema().getSchemaInfo().getSchemaDefinition());
Expand All @@ -79,6 +81,18 @@ public void write(Record<GenericObject> record) {
}
}

// testing that actually the Sink is able to use Native AVRO
if (record.getSchema().getSchemaInfo().getType() == SchemaType.AVRO) {
GenericRecord nativeGenericRecord = (GenericRecord) record.getValue().getNativeObject();
log.info("Schema from AVRO generic object {}", nativeGenericRecord.getSchema());
}

// testing that actually the Sink is able to use Native JSON
if (record.getSchema().getSchemaInfo().getType() == SchemaType.JSON) {
JsonNode nativeGenericRecord = (JsonNode) record.getValue().getNativeObject();
log.info("NodeType from JsonNode generic object {}", nativeGenericRecord.getNodeType());
}

record.ack();
}

Expand Down
Expand Up @@ -108,6 +108,7 @@ public void testGenericObjectSink() throws Exception {
List<SinkSpec> specs = Arrays.asList(
new SinkSpec("test-kv-sink-input-string-" + randomName(8), Schema.STRING, "foo"),
new SinkSpec("test-kv-sink-input-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
new SinkSpec("test-kv-sink-input-json-" + randomName(8), Schema.JSON(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8),
Schema.KeyValue(Schema.STRING, Schema.INT32), new KeyValue<>("foo", 123)),
new SinkSpec("test-kv-sink-input-kv-avro-json-inl-" + randomName(8),
Expand Down

0 comments on commit 94780b2

Please sign in to comment.