Skip to content

PIP 85 : Add Schema Information to Message in Java Client API

Enrico Olivelli edited this page May 12, 2021 · 2 revisions

PIP-85 Add Schema Information to Message in Java Client API

Status: Accepted

Author: Enrico Olivelli

Pull Request: https://github.com/apache/pulsar/pull/10476

Mailing List discussion: https://lists.apache.org/thread.html/rb705524b64c72ffafb6e926be5560dc07bea8cedce169f875e7cb725%40%3Cdev.pulsar.apache.org%3E

Github issue: https://github.com/apache/pulsar/issues/10438

Release: 2.8.0

Motivation

Currently, in Pulsar 2.7.x. you do not have access to the Schema of a Message when you receive it using the Consumer and the Reader API.

Use cases that would benefit from this API:

  • Consuming messages using Schema.AUTO_CONSUME: because in that case you do not know at compile time the Schema of the Message
  • General purpose code that manages Messages without a strict coupling with the code that receives the message.

It should also be noted that even if Pulsar handles schemas at topic level, each Message can have a different schema, so downloading once the latest schema bound to a topic is not enough for advanced use cases.

Public Interfaces

We add a new Optional<Schema<?> getReaderSchema() method to the Message interface.

interface Message<T> { Optional<Schema<?>> getReaderSchema(); }

For topics that do not have a schema we return an empty Optional, and we return a Schema that represents the Schema used to produce the Message, at the specific version.

The Schema is the Schema used for reading the Message, in case of Schema.AUTO_CONSUME() this is the same schema used by the writer. A similar API is already present on Pulsar IO Record interface: Record.getSchema().

Please note that even if the Message is bound to generic type T (for instance Message<String>, Message<GenericRecord>) we are going to return a Schema<?> because especially in case of Schema.AUTO_CONSUME() we want to return the exact Schema used to encode the payload.

A particular case of interest is Message (or Message that basically is the same), that is what you get when you use Schema.AUTO_CONSUME(): the value of the Message is a wrapper around another object, specifically:

  • Struct types (GenericRecord, so AVRO, JSON and Protobuf)
  • Primitive types (String, int32…)
  • KeyValue, both with INLINE and SEPARATED key encoding

You access the decoded value using GenericObject.getNativeObject() method and you get the SchemaType using GenericObject.getSchemaType().

In this case the Schema (AutoConsumeSchema) is a wrapper with this features: it automatically downloads the schema associated with the topic it is able to deal with multiple versions of the same schema See below for a detailed explanation.

We introduce a method in the SchemaReader API. interface SchemaReader<T> { Object getNativeSchema() }

These two methods are needed to support narrowing a Schema to a specific version, see the details below.

Proposed Changes

This PIP introduces the new Message.getReaderSchema() API and the necessary implementation.

Requirements for the Schema instance returned by Message.getReaderSchema:

  • Schema.getSchemaInfo() must return the correct schema information: type, properties, schema definition.

  • Schema.getNativeSchema() must return a Native Schema when possible (for instance a AVRO Schema): this allows users to access low level information for the Schema.

  • Schema.decode(payload) (without an explicit schemaVersion) must decode the payload using the same schemaVersion associated with the Message.

AutoConsumeSchema will not simply return the internal cached Schema: this is not enough in case of Schema that support multi versioning because the wrapped schema for AutoConsumeSchema returns the latest version of the Schema that the time of bootstrapping the Consumer.

In order to return a Schema at the correct version we must add a new method to the internal class AbstractSchema, this is not part of the public API.

abstract class AbstractSchema<T> {
      /**
       * Return an instance of this schema at the given version.
       * @param schemaVersion the version
       * @return the schema at that specific version
       * @throws SchemaSerializationException in case of unknown schema version
       * @throws NullPointerException in case of null schemaVersion
       */````
    public Schema<?> atSchemaVersion(byte[] schemaVersion)
                                    throws SchemaSerializationException {
       Objects.requireNonNull(schemaVersion);
       if (!supportSchemaVersioning()) {
           return this;`
       } else {
           throw new SchemaSerializationException("Not implemented for " + this.getClass());
    }
}

This new method returns the schema itself in case of a schema that does not support multiple versions (supportsSchemaVersioning) and it returns a new schema instance that fulfils the requirements of this PIP.

Caching the value may be possible because the Schema objects should be immutable, but this is not always the case, so adding this cache is out of the scope of this PIP, we can introduce it if needed.

By the way we are able to cache all of the underlying structures that come with a cost: access to the Schema Registry and parsing the Schema definition.

Most of the handling of multi version schemas is in AbstractStructSchema, and this class relies on AbstractMultiVersionReader and SchemaInfoProvider in order to download new versions of the schema when needed.

So the main implementation of atSchemaVersion is in AbstractStructSchema.

Key points of the implementation of AbstractStructSchema.atSchemaVersion:

  • it must leverage existing cache in SchemaInfoProvider for getSchemaInfo(): so download the Schema from the Registry only once
  • it must leverage existing cache in AbstractMultiVersionReader for getNativeSchema(): so parse the Native Schema only once
  • it must implement decode(byte[]) in order to call decode(byte[], schemaVersion)

In order to let AbstractStructSchema access the Native Schema from AbstractMultiVersionReader we must add a new API method in SchemaReader

interface SchemaReader {
 default Optional<Object> getNativeSchema() {
    return Optional.empty();
 }
}

This way every SchemaReader may report the underlying Native Schema.

Compatibility, Deprecation, and Migration Plan

No compatibility issues in the Public API. Please note that this PIP renames existing MessageImpl.getSchema() to MessageImpl.getSchemaInternal(), in order to not change the behaviour of existing code.

Test Plan

We are going to enrich existing tests about Schema Management (like SchemaTest.java), by adding additional checks about Message.getReaderSchema() and the ability to access Schema.getNativeSchema() from the returned Schema.

We are going to add new tests about Schema versioning, Message.getReaderSchema() and Schema.getNativeSchema() using Schema.AUTO_CONSUME().

For instance:

  • write with an Avro Schema1
  • write with an Avro Schema2
  • receive Message1
  • receive Message2
  • verify getSchema() and getSchema().getSchemaInfo) and getNativeSchema() on Message1
  • verify getSchema() and getSchema().getSchemaInfo) and getNativeSchema() on Message2

Please note that if you consume Message2, then the “latest” schema is Avro Schema2 but the Schema returned by Message1 must be AvroSchema1.

The important point here is that every message reports the exact schema.

Rejected Alternatives

Simply expose MessageImpl.getSchema() (and TopicMessageImpl.getSchema())

This won’t work with Schema.AUTO_CONSUME() because MessageImpl.getSchema() returns AutoConsumeSchema that is a special schema that wraps another schema.

We need to return the Schema at a specific SchemaVersion, so we need to unwrap the underlying schema and in case of a Schema that supports versioning we must return the exact version of the Schema.

Add a new MultiversionSchema interface

We could not add the atSchemaVersion method to the Schema interface, and add a new interface that exposes atSchemaVersion (and make AbstractStructSchema and AutoConsumeSchema implement that interface) but the Schema interface already provides a “decode(byte[], schemaVersion)” so every Schema is already “version aware” from the point of view of the API.

Future works

Enhance Pulsar IO Sinks in order to return the correct Schema information in Record.getSchema(), by leveraging Message.getReaderSchema().

The case is PulsarRecord.java in which we expose the Message Schema, and we access the Schema wrapped by AutoConsumeSchema (in the current master branch of Pulsar, before this PIP). We must call Message.getReaderSchema() in order to access accurate schema information.

Clone this wiki locally