Skip to content

Commit

Permalink
Pulsar Functions: allow a Function<GenericObject,?> to access the ori…
Browse files Browse the repository at this point in the history
…ginal Schema of the Message and use it
  • Loading branch information
eolivelli committed Apr 3, 2022
1 parent 963cf6a commit 55093ee
Show file tree
Hide file tree
Showing 7 changed files with 416 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,18 @@ public Schema<?> getInternalSchema(byte[] schemaVersion) {
return schemaMap.get(getSchemaVersion(schemaVersion));
}

/**
* Get a specific schema version, fetching from the Registry if it is not loaded yet.
* This method is not intended to be used by applications.
* @param schemaVersion the version
* @return the Schema at the specific version
* @see #atSchemaVersion(byte[])
*/
public Schema<?> unwrapInternalSchema(byte[] schemaVersion) {
fetchSchemaIfNeeded(BytesSchemaVersion.of(schemaVersion));
return getInternalSchema(schemaVersion);
}

/**
* It may happen that the schema is not loaded but we need it, for instance in order to call getSchemaInfo()
* We cannot call this method in getSchemaInfo, because getSchemaInfo is called in many
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -119,6 +120,16 @@ protected Record<T> buildRecord(Consumer<T> consumer, Message<T> message) {
TopicMessageImpl impl = (TopicMessageImpl) message;
schema = impl.getSchemaInternal();
}

// we don't want the Function/Sink to see AutoConsumeSchema
if (schema instanceof AutoConsumeSchema) {
AutoConsumeSchema autoConsumeSchema = (AutoConsumeSchema) schema;
// we cannot use atSchemaVersion, because atSchemaVersion is only
// able to decode data, here we want a Schema that
// is able to re-encode the payload when needed.
schema = (Schema<T>) autoConsumeSchema
.unwrapInternalSchema(message.getSchemaVersion());
}
return PulsarRecord.<T>builder()
.message(message)
.schema(schema)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.functions;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;

/**
* This function processes any message with any schema,
* and outputs the message with the same schema to another topic.
*/
@Slf4j
public class GenericObjectFunction implements Function<GenericObject, Void> {

@Override
public Void process(GenericObject genericObject, Context context) throws Exception {
Record<?> currentRecord = context.getCurrentRecord();
log.info("apply to {} {}", genericObject, genericObject.getNativeObject());
log.info("record with schema {} {}", currentRecord.getSchema(), currentRecord);
// do some processing...
final boolean isStruct;
switch (currentRecord.getSchema().getSchemaInfo().getType()) {
case AVRO:
case JSON:
case PROTOBUF_NATIVE:
isStruct = true;
break;
default:
isStruct = false;
break;
}
if (isStruct) {
// GenericRecord must stay wrapped
context.newOutputMessage(context.getOutputTopic(), (Schema) currentRecord.getSchema())
.value(genericObject).send();
} else {
// primitives and KeyValue must be unwrapped
context.newOutputMessage(context.getOutputTopic(), (Schema) currentRecord.getSchema())
.value(genericObject.getNativeObject()).send();
}
return null;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.functions;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;

import java.io.ByteArrayOutputStream;
import java.util.stream.Collectors;

/**
* This function removes a "field" from a AVRO message
*/
@Slf4j
public class RemoveAvroFieldFunction implements Function<GenericObject, Void> {

private static final String FIELD_TO_REMOVE = "age";

@Override
public Void process(GenericObject genericObject, Context context) throws Exception {
Record<?> currentRecord = context.getCurrentRecord();
log.info("apply to {} {}", genericObject, genericObject.getNativeObject());
log.info("record with schema {} version {} {}", currentRecord.getSchema(),
currentRecord.getMessage().get().getSchemaVersion(),
currentRecord);
Object nativeObject = genericObject.getNativeObject();
Schema<?> schema = currentRecord.getSchema();

Schema outputSchema = schema;
Object outputObject = genericObject.getNativeObject();
boolean someThingDone = false;
if (schema instanceof KeyValueSchema && nativeObject instanceof KeyValue) {
KeyValueSchema kvSchema = (KeyValueSchema) schema;

Schema keySchema = kvSchema.getKeySchema();
Schema valueSchema = kvSchema.getValueSchema();
// remove a column "age" from the "valueSchema"
if (valueSchema.getSchemaInfo().getType() == SchemaType.AVRO) {

org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) valueSchema.getNativeSchema().get();
if (avroSchema.getField(FIELD_TO_REMOVE) != null) {
org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
org.apache.avro.Schema originalAvroSchema = parser.parse(avroSchema.toString(false));
org.apache.avro.Schema modified = org.apache.avro.Schema.createRecord(
originalAvroSchema.getName(), originalAvroSchema.getDoc(), originalAvroSchema.getNamespace(), originalAvroSchema.isError(),
originalAvroSchema.getFields().
stream()
.filter(f->!f.name().equals(FIELD_TO_REMOVE))
.map(f-> new org.apache.avro.Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order()))
.collect(Collectors.toList()));

KeyValue originalObject = (KeyValue) nativeObject;

GenericRecord value = (GenericRecord) originalObject.getValue();
org.apache.avro.generic.GenericRecord genericRecord
= (org.apache.avro.generic.GenericRecord) value.getNativeObject();

org.apache.avro.generic.GenericRecord newRecord = new GenericData.Record(modified);
for (org.apache.avro.Schema.Field field : modified.getFields()) {
newRecord.put(field.name(), genericRecord.get(field.name()));
}
GenericDatumWriter writer = new GenericDatumWriter(modified);
ByteArrayOutputStream oo = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(oo, null);
writer.write(newRecord, encoder);
Object newValue = oo.toByteArray();

Schema newValueSchema = Schema.NATIVE_AVRO(modified);
outputSchema = Schema.KeyValue(keySchema, newValueSchema, kvSchema.getKeyValueEncodingType());
outputObject = new KeyValue(originalObject.getKey(), newValue);
someThingDone = true;
}
}
} else if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) schema.getNativeSchema().get();
if (avroSchema.getField(FIELD_TO_REMOVE) != null) {
org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
org.apache.avro.Schema originalAvroSchema = parser.parse(avroSchema.toString(false));
org.apache.avro.Schema modified = org.apache.avro.Schema.createRecord(
originalAvroSchema.getName(), originalAvroSchema.getDoc(), originalAvroSchema.getNamespace(), originalAvroSchema.isError(),
originalAvroSchema.getFields().
stream()
.filter(f -> !f.name().equals(FIELD_TO_REMOVE))
.map(f -> new org.apache.avro.Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order()))
.collect(Collectors.toList()));

org.apache.avro.generic.GenericRecord genericRecord
= (org.apache.avro.generic.GenericRecord) nativeObject;
org.apache.avro.generic.GenericRecord newRecord = new GenericData.Record(modified);
for (org.apache.avro.Schema.Field field : modified.getFields()) {
newRecord.put(field.name(), genericRecord.get(field.name()));
}
GenericDatumWriter writer = new GenericDatumWriter(modified);
ByteArrayOutputStream oo = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(oo, null);
writer.write(newRecord, encoder);

Schema newValueSchema = Schema.NATIVE_AVRO(modified);
outputSchema = newValueSchema;
outputObject = oo.toByteArray();
someThingDone = true;
}
}

if (!someThingDone) {
// do some processing...
final boolean isStruct;
switch (currentRecord.getSchema().getSchemaInfo().getType()) {
case AVRO:
case JSON:
case PROTOBUF_NATIVE:
isStruct = true;
break;
default:
isStruct = false;
break;
}
if (isStruct) {
// GenericRecord must stay wrapped
outputObject = currentRecord.getValue();
} else {
// primitives and KeyValue must be unwrapped
outputObject = nativeObject;
}
}
log.info("output {} schema {}", outputObject, outputSchema);
context.newOutputMessage(context.getOutputTopic(), outputSchema)
.value(outputObject).send();
return null;
}
}

0 comments on commit 55093ee

Please sign in to comment.