diff --git a/site2/website-next/versioned_docs/version-2.7.2/schema-evolution-compatibility.md b/site2/website-next/versioned_docs/version-2.7.2/schema-evolution-compatibility.md new file mode 100644 index 0000000000000..6e08e164a05ae --- /dev/null +++ b/site2/website-next/versioned_docs/version-2.7.2/schema-evolution-compatibility.md @@ -0,0 +1,205 @@ +--- +id: schema-evolution-compatibility +title: Schema evolution and compatibility +sidebar_label: "Schema evolution and compatibility" +original_id: schema-evolution-compatibility +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + + +Normally, schemas do not stay the same over a long period of time. Instead, they undergo evolutions to satisfy new needs. + +This chapter examines how Pulsar schema evolves and what Pulsar schema compatibility check strategies are. + +## Schema evolution + +Pulsar schema is defined in a data structure called `SchemaInfo`. + +Each `SchemaInfo` stored with a topic has a version. The version is used to manage the schema changes happening within a topic. + +The message produced with `SchemaInfo` is tagged with a schema version. When a message is consumed by a Pulsar client, the Pulsar client can use the schema version to retrieve the corresponding `SchemaInfo` and use the correct schema information to deserialize data. + +### What is schema evolution? + +Schemas store the details of attributes and types. To satisfy new business requirements, you need to update schemas inevitably over time, which is called **schema evolution**. + +Any schema changes affect downstream consumers. Schema evolution ensures that the downstream consumers can seamlessly handle data encoded with both old schemas and new schemas. + +### How Pulsar schema should evolve? + +The answer is Pulsar schema compatibility check strategy. It determines how schema compares old schemas with new schemas in topics. + +For more information, see [Schema compatibility check strategy](#schema-compatibility-check-strategy). + +### How does Pulsar support schema evolution? + +1. When a producer/consumer/reader connects to a broker, the broker deploys the schema compatibility checker configured by `schemaRegistryCompatibilityCheckers` to enforce schema compatibility check. + + The schema compatibility checker is one instance per schema type. + + Currently, Avro and JSON have their own compatibility checkers, while all the other schema types share the default compatibility checker which disables schema evolution. + +2. The producer/consumer/reader sends its client `SchemaInfo` to the broker. + +3. The broker knows the schema type and locates the schema compatibility checker for that type. + +4. The broker uses the checker to check if the `SchemaInfo` is compatible with the latest schema of the topic by applying its compatibility check strategy. + + Currently, the compatibility check strategy is configured at the namespace level and applied to all the topics within that namespace. + +## Schema compatibility check strategy + +Pulsar has 8 schema compatibility check strategies, which are summarized in the following table. + +Suppose that you have a topic containing three schemas (V1, V2, and V3), V1 is the oldest and V3 is the latest: + +| Compatibility check strategy | Definition | Changes allowed | Check against which schema | Upgrade first | +| --- | --- | --- | --- | --- | +| `ALWAYS_COMPATIBLE` | Disable schema compatibility check. | All changes are allowed | All previous versions | Any order | +| `ALWAYS_INCOMPATIBLE` | Disable schema evolution. | All changes are disabled | None | None | +| `BACKWARD` | Consumers using the schema V3 can process data written by producers using the schema V3 or V2. |
  • Add optional fields
  • Delete fields
  • | Latest version | Consumers | +| `BACKWARD_TRANSITIVE` | Consumers using the schema V3 can process data written by producers using the schema V3, V2 or V1. |
  • Add optional fields
  • Delete fields
  • | All previous versions | Consumers | +| `FORWARD` | Consumers using the schema V3 or V2 can process data written by producers using the schema V3. |
  • Add fields
  • Delete optional fields
  • | Latest version | Producers | +| `FORWARD_TRANSITIVE` | Consumers using the schema V3, V2 or V1 can process data written by producers using the schema V3. |
  • Add fields
  • Delete optional fields
  • | All previous versions | Producers | +| `FULL` | Backward and forward compatible between the schema V3 and V2. |
  • Modify optional fields
  • | Latest version | Any order | +| `FULL_TRANSITIVE` | Backward and forward compatible among the schema V3, V2, and V1. |
  • Modify optional fields
  • | All previous versions | Any order | + +### ALWAYS_COMPATIBLE and ALWAYS_INCOMPATIBLE + +| Compatibility check strategy | Definition | Note | +| --- | --- | --- | +| `ALWAYS_COMPATIBLE` | Disable schema compatibility check. | None | +| `ALWAYS_INCOMPATIBLE` | Disable schema evolution, that is, any schema change is rejected. |
  • For all schema types except Avro and JSON, the default schema compatibility check strategy is `ALWAYS_INCOMPATIBLE`.
  • For Avro and JSON, the default schema compatibility check strategy is `FULL`.
  • | + +#### Example + +* Example 1 + + In some situations, an application needs to store events of several different types in the same Pulsar topic. + + In particular, when developing a data model in an `Event Sourcing` style, you might have several kinds of events that affect the state of an entity. + + For example, for a user entity, there are `userCreated`, `userAddressChanged` and `userEnquiryReceived` events. The application requires that those events are always read in the same order. + + Consequently, those events need to go in the same Pulsar partition to maintain order. This application can use `ALWAYS_COMPATIBLE` to allow different kinds of events co-exist in the same topic. + +* Example 2 + + Sometimes we also make incompatible changes. + + For example, you are modifying a field type from `string` to `int`. + + In this case, you need to: + + * Upgrade all producers and consumers to the new schema versions at the same time. + + * Optionally, create a new topic and start migrating applications to use the new topic and the new schema, avoiding the need to handle two incompatible versions in the same topic. + +### BACKWARD and BACKWARD_TRANSITIVE + +Suppose that you have a topic containing three schemas (V1, V2, and V3), V1 is the oldest and V3 is the latest: + +| Compatibility check strategy | Definition | Description | +|---|---|---| +`BACKWARD` | Consumers using the new schema can process data written by producers using the **last schema**. | The consumers using the schema V3 can process data written by producers using the schema V3 or V2. | +`BACKWARD_TRANSITIVE` | Consumers using the new schema can process data written by producers using **all previous schemas**. | The consumers using the schema V3 can process data written by producers using the schema V3, V2, or V1. | + +#### Example + +* Example 1 + + Remove a field. + + A consumer constructed to process events without one field can process events written with the old schema containing the field, and the consumer will ignore that field. + +* Example 2 + + You want to load all Pulsar data into a Hive data warehouse and run SQL queries against the data. + + Same SQL queries must continue to work even the data is changed. To support it, you can evolve the schemas using the `BACKWARD` strategy. + +### FORWARD and FORWARD_TRANSITIVE + +Suppose that you have a topic containing three schemas (V1, V2, and V3), V1 is the oldest and V3 is the latest: + +| Compatibility check strategy | Definition | Description | +|---|---|---| +`FORWARD` | Consumers using the **last schema** can process data written by producers using a new schema, even though they may not be able to use the full capabilities of the new schema. | The consumers using the schema V3 or V2 can process data written by producers using the schema V3. | +`FORWARD_TRANSITIVE` | Consumers using **all previous schemas** can process data written by producers using a new schema. | The consumers using the schema V3, V2, or V1 can process data written by producers using the schema V3. + +#### Example + +* Example 1 + + Add a field. + + In most data formats, consumers written to process events without new fields can continue doing so even when they receive new events containing new fields. + +* Example 2 + + If a consumer has an application logic tied to a full version of a schema, the application logic may not be updated instantly when the schema evolves. + + In this case, you need to project data with a new schema onto an old schema that the application understands. + + Consequently, you can evolve the schemas using the `FORWARD` strategy to ensure that the old schema can process data encoded with the new schema. + +### FULL and FULL_TRANSITIVE + +Suppose that you have a topic containing three schemas (V1, V2, and V3), V1 is the oldest and V3 is the latest: + +| Compatibility check strategy | Definition | Description | Note | +| --- | --- | --- | --- | +| `FULL` | Schemas are both backward and forward compatible, which means: Consumers using the last schema can process data written by producers using the new schema. AND Consumers using the new schema can process data written by producers using the last schema. | Consumers using the schema V3 can process data written by producers using the schema V3 or V2. AND Consumers using the schema V3 or V2 can process data written by producers using the schema V3. |
  • For Avro and JSON, the default schema compatibility check strategy is `FULL`.
  • For all schema types except Avro and JSON, the default schema compatibility check strategy is `ALWAYS_INCOMPATIBLE`.
  • | +| `FULL_TRANSITIVE` | The new schema is backward and forward compatible with all previously registered schemas. | Consumers using the schema V3 can process data written by producers using the schema V3, V2 or V1. AND Consumers using the schema V3, V2 or V1 can process data written by producers using the schema V3. | None | + +#### Example + +In some data formats, for example, Avro, you can define fields with default values. Consequently, adding or removing a field with a default value is a fully compatible change. + +## Schema verification + +When a producer or a consumer tries to connect to a topic, a broker performs some checks to verify a schema. + +### Producer + +When a producer tries to connect to a topic (suppose ignore the schema auto creation), a broker does the following checks: + +* Check if the schema carried by the producer exists in the schema registry or not. + + * If the schema is already registered, then the producer is connected to a broker and produce messages with that schema. + + * If the schema is not registered, then Pulsar verifies if the schema is allowed to be registered based on the configured compatibility check strategy. + +### Consumer +When a consumer tries to connect to a topic, a broker checks if a carried schema is compatible with a registered schema based on the configured schema compatibility check strategy. + +| Compatibility check strategy | Check logic | +| --- | --- | +| `ALWAYS_COMPATIBLE` | All pass | +| `ALWAYS_INCOMPATIBLE` | No pass | +| `BACKWARD` | Can read the last schema | +| `BACKWARD_TRANSITIVE` | Can read all schemas | +| `FORWARD` | Can read the last schema | +| `FORWARD_TRANSITIVE` | Can read the last schema | +| `FULL` | Can read the last schema | +| `FULL_TRANSITIVE` | Can read all schemas | + +## Order of upgrading clients + +The order of upgrading client applications is determined by the compatibility check strategy. + +For example, the producers using schemas to write data to Pulsar and the consumers using schemas to read data from Pulsar. + +| Compatibility check strategy | Upgrade first | Description | +| --- | --- | --- | +| `ALWAYS_COMPATIBLE` | Any order | The compatibility check is disabled. Consequently, you can upgrade the producers and consumers in **any order**. | +| `ALWAYS_INCOMPATIBLE` | None | The schema evolution is disabled. | +|
  • `BACKWARD`
  • `BACKWARD_TRANSITIVE`
  • | Consumers | There is no guarantee that consumers using the old schema can read data produced using the new schema. Consequently, **upgrade all consumers first**, and then start producing new data. | +|
  • `FORWARD`
  • `FORWARD_TRANSITIVE`
  • | Producers | There is no guarantee that consumers using the new schema can read data produced using the old schema. Consequently, **upgrade all producers first**
  • to use the new schema and ensure that the data already produced using the old schemas are not available to consumers, and then upgrade the consumers.
  • | +|
  • `FULL`
  • `FULL_TRANSITIVE`
  • | Any order | There is no guarantee that consumers using the old schema can read data produced using the new schema and consumers using the new schema can read data produced using the old schema. Consequently, you can upgrade the producers and consumers in **any order**. | + + + + diff --git a/site2/website-next/versioned_docs/version-2.7.2/schema-get-started.md b/site2/website-next/versioned_docs/version-2.7.2/schema-get-started.md new file mode 100644 index 0000000000000..97529fbb125ca --- /dev/null +++ b/site2/website-next/versioned_docs/version-2.7.2/schema-get-started.md @@ -0,0 +1,106 @@ +--- +id: schema-get-started +title: Get started +sidebar_label: "Get started" +original_id: schema-get-started +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + + +This chapter introduces Pulsar schemas and explains why they are important. + +## Schema Registry + +Type safety is extremely important in any application built around a message bus like Pulsar. + +Producers and consumers need some kind of mechanism for coordinating types at the topic level to avoid various potential problems arise. For example, serialization and deserialization issues. + +Applications typically adopt one of the following approaches to guarantee type safety in messaging. Both approaches are available in Pulsar, and you're free to adopt one or the other or to mix and match on a per-topic basis. + +#### Note +> +> Currently, the Pulsar schema registry is only available for the [Java client](client-libraries-java.md), [CGo client](client-libraries-cgo.md), [Python client](client-libraries-python.md), and [C++ client](client-libraries-cpp). + +### Client-side approach + +Producers and consumers are responsible for not only serializing and deserializing messages (which consist of raw bytes) but also "knowing" which types are being transmitted via which topics. + +If a producer is sending temperature sensor data on the topic `topic-1`, consumers of that topic will run into trouble if they attempt to parse that data as moisture sensor readings. + +Producers and consumers can send and receive messages consisting of raw byte arrays and leave all type safety enforcement to the application on an "out-of-band" basis. + +### Server-side approach + +Producers and consumers inform the system which data types can be transmitted via the topic. + +With this approach, the messaging system enforces type safety and ensures that producers and consumers remain synced. + +Pulsar has a built-in **schema registry** that enables clients to upload data schemas on a per-topic basis. Those schemas dictate which data types are recognized as valid for that topic. + +## Why use schema + +When a schema is enabled, Pulsar does parse data, it takes bytes as inputs and sends bytes as outputs. While data has meaning beyond bytes, you need to parse data and might encounter parse exceptions which mainly occur in the following situations: + +* The field does not exist + +* The field type has changed (for example, `string` is changed to `int`) + +There are a few methods to prevent and overcome these exceptions, for example, you can catch exceptions when parsing errors, which makes code hard to maintain; or you can adopt a schema management system to perform schema evolution, not to break downstream applications, and enforces type safety to max extend in the language you are using, the solution is Pulsar Schema. + +Pulsar schema enables you to use language-specific types of data when constructing and handling messages from simple types like `string` to more complex application-specific types. + +**Example** + +You can use the _User_ class to define the messages sent to Pulsar topics. + +``` + +public class User { + String name; + int age; +} + +``` + +When constructing a producer with the _User_ class, you can specify a schema or not as below. + +### Without schema + +If you construct a producer without specifying a schema, then the producer can only produce messages of type `byte[]`. If you have a POJO class, you need to serialize the POJO into bytes before sending messages. + +**Example** + +``` + +Producer producer = client.newProducer() + .topic(topic) + .create(); +User user = new User("Tom", 28); +byte[] message = … // serialize the `user` by yourself; +producer.send(message); + +``` + +### With schema + +If you construct a producer with specifying a schema, then you can send a class to a topic directly without worrying about how to serialize POJOs into bytes. + +**Example** + +This example constructs a producer with the _JSONSchema_, and you can send the _User_ class to topics directly without worrying about how to serialize it into bytes. + +``` + +Producer producer = client.newProducer(JSONSchema.of(User.class)) + .topic(topic) + .create(); +User user = new User("Tom", 28); +producer.send(user); + +``` + +### Summary + +When constructing a producer with a schema, you do not need to serialize messages into bytes, instead Pulsar schema does this job in the background. diff --git a/site2/website-next/versioned_docs/version-2.7.2/schema-manage.md b/site2/website-next/versioned_docs/version-2.7.2/schema-manage.md new file mode 100644 index 0000000000000..b8377c0e6f4fb --- /dev/null +++ b/site2/website-next/versioned_docs/version-2.7.2/schema-manage.md @@ -0,0 +1,684 @@ +--- +id: schema-manage +title: Manage schema +sidebar_label: "Manage schema" +original_id: schema-manage +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + + +This guide demonstrates the ways to manage schemas: + +* Automatically + + * [Schema AutoUpdate](#schema-autoupdate) + +* Manually + + * [Schema manual management](#schema-manual-management) + + * [Custom schema storage](#custom-schema-storage) + +## Schema AutoUpdate + +If a schema passes the schema compatibility check, Pulsar producer automatically updates this schema to the topic it produces by default. + +### AutoUpdate for producer + +For a producer, the `AutoUpdate` happens in the following cases: + +* If a **topic doesn’t have a schema**, Pulsar registers a schema automatically. + +* If a **topic has a schema**: + + * If a **producer doesn’t carry a schema**: + + * If `isSchemaValidationEnforced` or `schemaValidationEnforced` is **disabled** in the namespace to which the topic belongs, the producer is allowed to connect to the topic and produce data. + + * If `isSchemaValidationEnforced` or `schemaValidationEnforced` is **enabled** in the namespace to which the topic belongs, the producer is rejected and disconnected. + + * If a **producer carries a schema**: + + A broker performs the compatibility check based on the configured compatibility check strategy of the namespace to which the topic belongs. + + * If the schema is registered, a producer is connected to a broker. + + * If the schema is not registered: + + * If `isAllowAutoUpdateSchema` sets to **false**, the producer is rejected to connect to a broker. + + * If `isAllowAutoUpdateSchema` sets to **true**: + + * If the schema passes the compatibility check, then the broker registers a new schema automatically for the topic and the producer is connected. + + * If the schema does not pass the compatibility check, then the broker does not register a schema and the producer is rejected to connect to a broker. + +![AutoUpdate Producer](/assets/schema-producer.png) + +### AutoUpdate for consumer + +For a consumer, the `AutoUpdate` happens in the following cases: + +* If a **consumer connects to a topic without a schema** (which means the consumer receiving raw bytes), the consumer can connect to the topic successfully without doing any compatibility check. + +* If a **consumer connects to a topic with a schema**. + + * If a topic does not have all of them (a schema/data/a local consumer and a local producer): + + * If `isAllowAutoUpdateSchema` sets to **true**, then the consumer registers a schema and it is connected to a broker. + + * If `isAllowAutoUpdateSchema` sets to **false**, then the consumer is rejected to connect to a broker. + + * If a topic has one of them (a schema/data/a local consumer and a local producer), then the schema compatibility check is performed. + + * If the schema passes the compatibility check, then the consumer is connected to the broker. + + * If the schema does not pass the compatibility check, then the consumer is rejected to connect to the broker. + +![AutoUpdate Consumer](/assets/schema-consumer.png) + + +### Manage AutoUpdate strategy + +You can use the `pulsar-admin` command to manage the `AutoUpdate` strategy as below: + +* [Enable AutoUpdate](#enable-autoupdate) + +* [Disable AutoUpdate](#disable-autoupdate) + +* [Adjust compatibility](#adjust-compatibility) + +#### Enable AutoUpdate + +To enable `AutoUpdate` on a namespace, you can use the `pulsar-admin` command. + +```bash + +bin/pulsar-admin namespaces set-is-allow-auto-update-schema --enable tenant/namespace + +``` + +#### Disable AutoUpdate + +To disable `AutoUpdate` on a namespace, you can use the `pulsar-admin` command. + +```bash + +bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable tenant/namespace + +``` + +Once the `AutoUpdate` is disabled, you can only register a new schema using the `pulsar-admin` command. + +#### Adjust compatibility + +To adjust the schema compatibility level on a namespace, you can use the `pulsar-admin` command. + +```bash + +bin/pulsar-admin namespaces set-schema-compatibility-strategy --compatibility tenant/namespace + +``` + +### Schema validation + +By default, `schemaValidationEnforced` is **disabled** for producers: + +* This means a producer without a schema can produce any kind of messages to a topic with schemas, which may result in producing trash data to the topic. + +* This allows non-java language clients that don’t support schema can produce messages to a topic with schemas. + +However, if you want a stronger guarantee on the topics with schemas, you can enable `schemaValidationEnforced` across the whole cluster or on a per-namespace basis. + +#### Enable schema validation + +To enable `schemaValidationEnforced` on a namespace, you can use the `pulsar-admin` command. + +```bash + +bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace + +``` + +#### Disable schema validation + +To disable `schemaValidationEnforced` on a namespace, you can use the `pulsar-admin` command. + +```bash + +bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace + +``` + +## Schema manual management + +To manage schemas, you can use one of the following methods. + +| Method | Description | +| --- | --- | +| **Admin CLI**
  • | You can use the `pulsar-admin` tool to manage Pulsar schemas, brokers, clusters, sources, sinks, topics, tenants and so on. For more information about how to use the `pulsar-admin` tool, see [here](reference-pulsar-admin). | +| **REST API**
  • | Pulsar exposes schema related management API in Pulsar’s admin RESTful API. You can access the admin RESTful endpoint directly to manage schemas. For more information about how to use the Pulsar REST API, see [here](http://pulsar.apache.org/admin-rest-api/). | +| **Java Admin API**
  • | Pulsar provides Java admin library. | + +### Upload a schema + +To upload (register) a new schema for a topic, you can use one of the following methods. + + + + + +Use the `upload` subcommand. + +```bash + +$ pulsar-admin schemas upload --filename + +``` + +The `schema-definition-file` is in JSON format. + +```json + +{ + "type": "", + "schema": "", + "properties": {} // the properties associated with the schema +} + +``` + +The `schema-definition-file` includes the following fields: + +| Field | Description | +| --- | --- | +| `type` | The schema type. | +| `schema` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this field should be blank.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition.
  • | +| `properties` | The additional properties associated with the schema. | + +Here are examples of the `schema-definition-file` for a JSON schema. + +**Example 1** + +```json + +{ + "type": "JSON", + "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":\"string\",\"default\":null},{\"name\":\"file3\",\"type\":[\"null\",\"string\"],\"default\":\"dfdf\"}]}", + "properties": {} +} + +``` + +**Example 2** + +```json + +{ + "type": "STRING", + "schema": "", + "properties": { + "key1": "value1" + } +} + +``` + +
    + + +Send a `POST` request to this endpoint: {@inject: endpoint|POST|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/uploadSchem?version=@pulsar:version_number@a} + +The post payload is in JSON format. + +```json + +{ + "type": "", + "schema": "", + "properties": {} // the properties associated with the schema +} + +``` + +The post payload includes the following fields: + +| Field | Description | +| --- | --- | +| `type` | The schema type. | +| `schema` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this field should be blank.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition.
  • | +| `properties` | The additional properties associated with the schema. | + +
    + + +```java + +void createSchema(String topic, PostSchemaPayload schemaPayload) + +``` + +The `PostSchemaPayload` includes the following fields: + +| Field | Description | +| --- | --- | +| `type` | The schema type. | +| `schema` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this field should be blank.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition.
  • | +| `properties` | The additional properties associated with the schema. | + +Here is an example of `PostSchemaPayload`: + +```java + +PulsarAdmin admin = …; + +PostSchemaPayload payload = new PostSchemaPayload(); +payload.setType("INT8"); +payload.setSchema(""); + +admin.createSchema("my-tenant/my-ns/my-topic", payload); + +``` + +
    + +
    + +### Get a schema (latest) + +To get the latest schema for a topic, you can use one of the following methods. + + + + + +Use the `get` subcommand. + +```bash + +$ pulsar-admin schemas get + +{ + "version": 0, + "type": "String", + "timestamp": 0, + "data": "string", + "properties": { + "property1": "string", + "property2": "string" + } +} + +``` + + + + +Send a `GET` request to this endpoint: {@inject: endpoint|GET|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/getSchem?version=@pulsar:version_number@a} + +Here is an example of a response, which is returned in JSON format. + +```json + +{ + "version": "", + "type": "", + "timestamp": "", + "data": "", + "properties": {} // the properties associated with the schema +} + +``` + +The response includes the following fields: + +| Field | Description | +| --- | --- | +| `version` | The schema version, which is a long number. | +| `type` | The schema type. | +| `timestamp` | The timestamp of creating this version of schema. | +| `data` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this field should be blank.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition.
  • | +| `properties` | The additional properties associated with the schema. | + +
    + + +```java + +SchemaInfo createSchema(String topic) + +``` + +The `SchemaInfo` includes the following fields: + +| Field | Description | +| --- | --- | +| `name` | The schema name. | +| `type` | The schema type. | +| `schema` | A byte array of the schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this byte array should be empty.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition converted to a byte array.
  • | +| `properties` | The additional properties associated with the schema. | + +Here is an example of `SchemaInfo`: + +```java + +PulsarAdmin admin = …; + +SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic"); + +``` + +
    + +
    + +### Get a schema (specific) + +To get a specific version of a schema, you can use one of the following methods. + + + + + +Use the `get` subcommand. + +```bash + +$ pulsar-admin schemas get --version= + +``` + + + + +Send a `GET` request to a schema endpoint: {@inject: endpoint|GET|/admin/v2/schemas/:tenant/:namespace/:topic/schema/:version|operation/getSchem?version=@pulsar:version_number@a} + +Here is an example of a response, which is returned in JSON format. + +```json + +{ + "version": "", + "type": "", + "timestamp": "", + "data": "", + "properties": {} // the properties associated with the schema +} + +``` + +The response includes the following fields: + +| Field | Description | +| --- | --- | +| `version` | The schema version, which is a long number. | +| `type` | The schema type. | +| `timestamp` | The timestamp of creating this version of schema. | +| `data` | The schema definition data, which is encoded in UTF 8 charset.
  • If the schema is a
  • **primitive**
  • schema, this field should be blank.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition.
  • | +| `properties` | The additional properties associated with the schema. | + +
    + + +```java + +SchemaInfo createSchema(String topic, long version) + +``` + +The `SchemaInfo` includes the following fields: + +| Field | Description | +| --- | --- | +| `name` | The schema name. | +| `type` | The schema type. | +| `schema` | A byte array of the schema definition data, which is encoded in UTF 8.
  • If the schema is a
  • **primitive**
  • schema, this byte array should be empty.
  • If the schema is a
  • **struct**
  • schema, this field should be a JSON string of the Avro schema definition converted to a byte array.
  • | +| `properties` | The additional properties associated with the schema. | + +Here is an example of `SchemaInfo`: + +```java + +PulsarAdmin admin = …; + +SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L); + +``` + +
    + +
    + +### Extract a schema + +To provide a schema via a topic, you can use the following method. + + + + + +Use the `extract` subcommand. + +```bash + +$ pulsar-admin schemas extract --classname --jar --type + +``` + + + + + +### Delete a schema + +To delete a schema for a topic, you can use one of the following methods. + +:::note + +In any case, the **delete** action deletes **all versions** of a schema registered for a topic. + +::: + + + + + +Use the `delete` subcommand. + +```bash + +$ pulsar-admin schemas delete + +``` + + + + +Send a `DELETE` request to a schema endpoint: {@inject: endpoint|DELETE|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/deleteSchema?version=@pulsar:version_number@} + +Here is an example of a response, which is returned in JSON format. + +```json + +{ + "version": "", +} + +``` + +The response includes the following field: + +Field | Description | +---|---| +`version` | The schema version, which is a long number. | + + + + +```java + +void deleteSchema(String topic) + +``` + +Here is an example of deleting a schema. + +```java + +PulsarAdmin admin = …; + +admin.deleteSchema("my-tenant/my-ns/my-topic"); + +``` + + + + + +## Custom schema storage + +By default, Pulsar stores various data types of schemas in [Apache BookKeeper](https://bookkeeper.apache.org) deployed alongside Pulsar. + +However, you can use another storage system if needed. + +### Implement + +To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you need to implement the following Java interfaces: + +* [SchemaStorage interface](#schemastorage-interface) + +* [SchemaStorageFactory interface](#schemastoragefactory-interface) + +#### SchemaStorage interface + +The `SchemaStorage` interface has the following methods: + +```java + +public interface SchemaStorage { + // How schemas are updated + CompletableFuture put(String key, byte[] value, byte[] hash); + + // How schemas are fetched from storage + CompletableFuture get(String key, SchemaVersion version); + + // How schemas are deleted + CompletableFuture delete(String key); + + // Utility method for converting a schema version byte array to a SchemaVersion object + SchemaVersion versionFromBytes(byte[] version); + + // Startup behavior for the schema storage client + void start() throws Exception; + + // Shutdown behavior for the schema storage client + void close() throws Exception; +} + +``` + +:::tip + +For a complete example of **schema storage** implementation, see [BookKeeperSchemaStorage](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java) class. + +::: + +#### SchemaStorageFactory interface + +The `SchemaStorageFactory` interface has the following method: + +```java + +public interface SchemaStorageFactory { + @NotNull + SchemaStorage create(PulsarService pulsar) throws Exception; +} + +``` + +:::tip + +For a complete example of **schema storage factory** implementation, see [BookKeeperSchemaStorageFactory](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java) class. + +::: + +### Deploy + +To use your custom schema storage implementation, perform the following steps. + +1. Package the implementation in a [JAR](https://docs.oracle.com/javase/tutorial/deployment/jar/basicsindex.html) file. + +2. Add the JAR file to the `lib` folder in your Pulsar binary or source distribution. + +3. Change the `schemaRegistryStorageClassName` configuration in `broker.conf` to your custom factory class. + +4. Start Pulsar. diff --git a/site2/website-next/versioned_docs/version-2.7.2/schema-understand.md b/site2/website-next/versioned_docs/version-2.7.2/schema-understand.md new file mode 100644 index 0000000000000..25fd65e9453b2 --- /dev/null +++ b/site2/website-next/versioned_docs/version-2.7.2/schema-understand.md @@ -0,0 +1,481 @@ +--- +id: schema-understand +title: Understand schema +sidebar_label: "Understand schema" +original_id: schema-understand +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + + +This chapter explains the basic concepts of Pulsar schema, focuses on the topics of particular importance, and provides additional background. + +## SchemaInfo + +Pulsar schema is defined in a data structure called `SchemaInfo`. + +The `SchemaInfo` is stored and enforced on a per-topic basis and cannot be stored at the namespace or tenant level. + +A `SchemaInfo` consists of the following fields: + +| Field | Description | +| --- | --- | +| `name` | Schema name (a string). | +| `type` | Schema type, which determines how to interpret the schema data.
  • Predefined schema: see [here](schema-understand.md#schema-type).
  • Customized schema: it is left as an empty string.
  • | +| `schema`(`payload`) | Schema data, which is a sequence of 8-bit unsigned bytes and schema-type specific. | +| `properties` | It is a user defined properties as a string/string map. Applications can use this bag for carrying any application specific logics. Possible properties might be the Git hash associated with the schema, an environment string like `dev` or `prod`. | + +**Example** + +This is the `SchemaInfo` of a string. + +```json + +{ + "name": "test-string-schema", + "type": "STRING", + "schema": "", + "properties": {} +} + +``` + +## Schema type + +Pulsar supports various schema types, which are mainly divided into two categories: + +* Primitive type + +* Complex type + +### Primitive type + +Currently, Pulsar supports the following primitive types: + +| Primitive Type | Description | +|---|---| +| `BOOLEAN` | A binary value | +| `INT8` | A 8-bit signed integer | +| `INT16` | A 16-bit signed integer | +| `INT32` | A 32-bit signed integer | +| `INT64` | A 64-bit signed integer | +| `FLOAT` | A single precision (32-bit) IEEE 754 floating-point number | +| `DOUBLE` | A double-precision (64-bit) IEEE 754 floating-point number | +| `BYTES` | A sequence of 8-bit unsigned bytes | +| `STRING` | A Unicode character sequence | +| `TIMESTAMP` (`DATE`, `TIME`) | A logic type represents a specific instant in time with millisecond precision.
    It stores the number of milliseconds since `January 1, 1970, 00:00:00 GMT` as an `INT64` value | +| INSTANT | A single instantaneous point on the time-line with nanoseconds precision| +| LOCAL_DATE | An immutable date-time object that represents a date, often viewed as year-month-day| +| LOCAL_TIME | An immutable date-time object that represents a time, often viewed as hour-minute-second. Time is represented to nanosecond precision.| +| LOCAL_DATE_TIME | An immutable date-time object that represents a date-time, often viewed as year-month-day-hour-minute-second | + +For primitive types, Pulsar does not store any schema data in `SchemaInfo`. The `type` in `SchemaInfo` is used to determine how to serialize and deserialize the data. + +Some of the primitive schema implementations can use `properties` to store implementation-specific tunable settings. For example, a `string` schema can use `properties` to store the encoding charset to serialize and deserialize strings. + +The conversions between **Pulsar schema types** and **language-specific primitive types** are as below. + +| Schema Type | Java Type| Python Type | Go Type | +|---|---|---|---| +| BOOLEAN | boolean | bool | bool | +| INT8 | byte | | int8 | +| INT16 | short | | int16 | +| INT32 | int | | int32 | +| INT64 | long | | int64 | +| FLOAT | float | float | float32 | +| DOUBLE | double | float | float64| +| BYTES | byte[], ByteBuffer, ByteBuf | bytes | []byte | +| STRING | string | str | string| +| TIMESTAMP | java.sql.Timestamp | | | +| TIME | java.sql.Time | | | +| DATE | java.util.Date | | | +| INSTANT | java.time.Instant | | | +| LOCAL_DATE | java.time.LocalDate | | | +| LOCAL_TIME | java.time.LocalTime | | +| LOCAL_DATE_TIME | java.time.LocalDateTime | | + +**Example** + +This example demonstrates how to use a string schema. + +1. Create a producer with a string schema and send messages. + + ```java + + Producer producer = client.newProducer(Schema.STRING).create(); + producer.newMessage().value("Hello Pulsar!").send(); + + ``` + +2. Create a consumer with a string schema and receive messages. + + ```java + + Consumer consumer = client.newConsumer(Schema.STRING).subscribe(); + consumer.receive(); + + ``` + +### Complex type + +Currently, Pulsar supports the following complex types: + +| Complex Type | Description | +|---|---| +| `keyvalue` | Represents a complex type of a key/value pair. | +| `struct` | Supports **AVRO**, **JSON**, and **Protobuf**. | + +#### keyvalue + +`Keyvalue` schema helps applications define schemas for both key and value. + +For `SchemaInfo` of `keyvalue` schema, Pulsar stores the `SchemaInfo` of key schema and the `SchemaInfo` of value schema together. + +Pulsar provides two methods to encode a key/value pair in messages: + +* `INLINE` + +* `SEPARATED` + +Users can choose the encoding type when constructing the key/value schema. + +##### INLINE + +Key/value pairs will be encoded together in the message payload. + +##### SEPARATED + +Key will be encoded in the message key and the value will be encoded in the message payload. + +**Example** + +This example shows how to construct a key/value schema and then use it to produce and consume messages. + +1. Construct a key/value schema with `INLINE` encoding type. + + ```java + + Schema> kvSchema = Schema.KeyValue( + Schema.INT32, + Schema.STRING, + KeyValueEncodingType.INLINE + ); + + ``` + +2. Optionally, construct a key/value schema with `SEPARATED` encoding type. + + ```java + + Schema> kvSchema = Schema.KeyValue( + Schema.INT32, + Schema.STRING, + KeyValueEncodingType.SEPARATED + ); + + ``` + +3. Produce messages using a key/value schema. + + ```java + + Schema> kvSchema = Schema.KeyValue( + Schema.INT32, + Schema.STRING, + KeyValueEncodingType.SEPARATED + ); + + Producer> producer = client.newProducer(kvSchema) + .topic(TOPIC) + .create(); + + final int key = 100; + final String value = "value-100"; + + // send the key/value message + producer.newMessage() + .value(new KeyValue(key, value)) + .send(); + + ``` + +4. Consume messages using a key/value schema. + + ```java + + Schema> kvSchema = Schema.KeyValue( + Schema.INT32, + Schema.STRING, + KeyValueEncodingType.SEPARATED + ); + + Consumer> consumer = client.newConsumer(kvSchema) + ... + .topic(TOPIC) + .subscriptionName(SubscriptionName).subscribe(); + + // receive key/value pair + Message> msg = consumer.receive(); + KeyValue kv = msg.getValue(); + + ``` + +#### struct + +Pulsar uses [Avro Specification](http://avro.apache.org/docs/current/spec.html) to declare the schema definition for `struct` schema. + +This allows Pulsar: + +* to use same tools to manage schema definitions + +* to use different serialization/deserialization methods to handle data + +There are two methods to use `struct` schema: + +* `static` + +* `generic` + +##### static + +You can predefine the `struct` schema, and it can be a POJO in Java, a `struct` in Go, or classes generated by Avro or Protobuf tools. + +**Example** + +Pulsar gets the schema definition from the predefined `struct` using an Avro library. The schema definition is the schema data stored as a part of the `SchemaInfo`. + +1. Create the _User_ class to define the messages sent to Pulsar topics. + + ```java + + public class User { + String name; + int age; + } + + ``` + +2. Create a producer with a `struct` schema and send messages. + + ```java + + Producer producer = client.newProducer(Schema.AVRO(User.class)).create(); + producer.newMessage().value(User.builder().userName("pulsar-user").userId(1L).build()).send(); + + ``` + +3. Create a consumer with a `struct` schema and receive messages + + ```java + + Consumer consumer = client.newConsumer(Schema.AVRO(User.class)).subscribe(); + User user = consumer.receive(); + + ``` + +##### generic + +Sometimes applications do not have pre-defined structs, and you can use this method to define schema and access data. + +You can define the `struct` schema using the `GenericSchemaBuilder`, generate a generic struct using `GenericRecordBuilder` and consume messages into `GenericRecord`. + +**Example** + +1. Use `RecordSchemaBuilder` to build a schema. + + ```java + + RecordSchemaBuilder recordSchemaBuilder = SchemaBuilder.record("schemaName"); + recordSchemaBuilder.field("intField").type(SchemaType.INT32); + SchemaInfo schemaInfo = recordSchemaBuilder.build(SchemaType.AVRO); + + Producer producer = client.newProducer(Schema.generic(schemaInfo)).create(); + + ``` + +2. Use `RecordBuilder` to build the struct records. + + ```java + + producer.newMessage().value(schema.newRecordBuilder() + .set("intField", 32) + .build()).send(); + + ``` + +### Auto Schema + +If you don't know the schema type of a Pulsar topic in advance, you can use AUTO schema to produce or consume generic records to or from brokers. + +| Auto Schema Type | Description | +|---|---| +| `AUTO_PRODUCE` | This is useful for transferring data **from a producer to a Pulsar topic that has a schema**. | +| `AUTO_CONSUME` | This is useful for transferring data **from a Pulsar topic that has a schema to a consumer**. | + +#### AUTO_PRODUCE + +`AUTO_PRODUCE` schema helps a producer validate whether the bytes sent by the producer is compatible with the schema of a topic. + +**Example** + +Suppose that: + +* You have a producer processing messages from a Kafka topic _K_. + +* You have a Pulsar topic _P_, and you do not know its schema type. + +* Your application reads the messages from _K_ and writes the messages to _P_. + +In this case, you can use `AUTO_PRODUCE` to verify whether the bytes produced by _K_ can be sent to _P_ or not. + +```java + +Produce pulsarProducer = client.newProducer(Schema.AUTO_PRODUCE()) + … + .create(); + +byte[] kafkaMessageBytes = … ; + +pulsarProducer.produce(kafkaMessageBytes); + +``` + +#### AUTO_CONSUME + +`AUTO_CONSUME` schema helps a Pulsar topic validate whether the bytes sent by a Pulsar topic is compatible with a consumer, that is, the Pulsar topic deserializes messages into language-specific objects using the `SchemaInfo` retrieved from broker-side. + +Currently, `AUTO_CONSUME` only supports **AVRO** and **JSON** schemas. It deserializes messages into `GenericRecord`. + +**Example** + +Suppose that: + +* You have a Pulsar topic _P_. + +* You have a consumer (for example, MySQL) receiving messages from the topic _P_. + +* You application reads the messages from _P_ and writes the messages to MySQL. + +In this case, you can use `AUTO_CONSUME` to verify whether the bytes produced by _P_ can be sent to MySQL or not. + +```java + +Consumer pulsarConsumer = client.newConsumer(Schema.AUTO_CONSUME()) + … + .subscribe(); + +Message msg = consumer.receive() ; +GenericRecord record = msg.getValue(); + +``` + +## Schema version + +Each `SchemaInfo` stored with a topic has a version. Schema version manages schema changes happening within a topic. + +Messages produced with a given `SchemaInfo` is tagged with a schema version, so when a message is consumed by a Pulsar client, the Pulsar client can use the schema version to retrieve the corresponding `SchemaInfo` and then use the `SchemaInfo` to deserialize data. + +Schemas are versioned in succession. Schema storage happens in a broker that handles the associated topics so that version assignments can be made. + +Once a version is assigned/fetched to/for a schema, all subsequent messages produced by that producer are tagged with the appropriate version. + +**Example** + +The following example illustrates how the schema version works. + +Suppose that a Pulsar [Java client](client-libraries-java) created using the code below attempts to connect to Pulsar and begins to send messages: + +```java + +PulsarClient client = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .build(); + +Producer producer = client.newProducer(JSONSchema.of(SensorReading.class)) + .topic("sensor-data") + .sendTimeout(3, TimeUnit.SECONDS) + .create(); + +``` + +The table below lists the possible scenarios when this connection attempt occurs and what happens in each scenario: + +| Scenario | What happens | +| --- | --- | +|
  • No schema exists for the topic.
  • | (1) The producer is created using the given schema. (2) Since no existing schema is compatible with the `SensorReading` schema, the schema is transmitted to the broker and stored. (3) Any consumer created using the same schema or topic can consume messages from the `sensor-data` topic. | +|
  • A schema already exists.
  • The producer connects using the same schema that is already stored.
  • | (1) The schema is transmitted to the broker. (2) The broker determines that the schema is compatible. (3) The broker attempts to store the schema in [BookKeeper](concepts-architecture-overview.md#persistent-storage) but then determines that it's already stored, so it is used to tag produced messages. |
  • A schema already exists.
  • The producer connects using a new schema that is compatible.
  • | (1) The schema is transmitted to the broker. (2) The broker determines that the schema is compatible and stores the new schema as the current version (with a new version number). | + +## How does schema work + +Pulsar schemas are applied and enforced at the **topic** level (schemas cannot be applied at the namespace or tenant level). + +Producers and consumers upload schemas to brokers, so Pulsar schemas work on the producer side and the consumer side. + +### Producer side + +This diagram illustrates how does schema work on the Producer side. + +![Schema works at the producer side](/assets/schema-producer.png) + +1. The application uses a schema instance to construct a producer instance. + + The schema instance defines the schema for the data being produced using the producer instance. + + Take AVRO as an example, Pulsar extract schema definition from the POJO class and construct the `SchemaInfo` that the producer needs to pass to a broker when it connects. + +2. The producer connects to the broker with the `SchemaInfo` extracted from the passed-in schema instance. + +3. The broker looks up the schema in the schema storage to check if it is already a registered schema. + +4. If yes, the broker skips the schema validation since it is a known schema, and returns the schema version to the producer. + +5. If no, the broker verifies whether a schema can be automatically created in this namespace: + + * If `isAllowAutoUpdateSchema` sets to **true**, then a schema can be created, and the broker validates the schema based on the schema compatibility check strategy defined for the topic. + + * If `isAllowAutoUpdateSchema` sets to **false**, then a schema can not be created, and the producer is rejected to connect to the broker. + +**Tip**: + +`isAllowAutoUpdateSchema` can be set via **Pulsar admin API** or **REST API.** + +For how to set `isAllowAutoUpdateSchema` via Pulsar admin API, see [Manage AutoUpdate Strategy](schema-manage.md/#manage-autoupdate-strategy). + +6. If the schema is allowed to be updated, then the compatible strategy check is performed. + + * If the schema is compatible, the broker stores it and returns the schema version to the producer. + + All the messages produced by this producer are tagged with the schema version. + + * If the schema is incompatible, the broker rejects it. + +### Consumer side + +This diagram illustrates how does Schema work on the consumer side. + +![Schema works at the consumer side](/assets/schema-consumer.png) + +1. The application uses a schema instance to construct a consumer instance. + + The schema instance defines the schema that the consumer uses for decoding messages received from a broker. + +2. The consumer connects to the broker with the `SchemaInfo` extracted from the passed-in schema instance. + +3. The broker determines whether the topic has one of them (a schema/data/a local consumer and a local producer). + +4. If a topic does not have all of them (a schema/data/a local consumer and a local producer): + + * If `isAllowAutoUpdateSchema` sets to **true**, then the consumer registers a schema and it is connected to a broker. + + * If `isAllowAutoUpdateSchema` sets to **false**, then the consumer is rejected to connect to a broker. + +5. If a topic has one of them (a schema/data/a local consumer and a local producer), then the schema compatibility check is performed. + + * If the schema passes the compatibility check, then the consumer is connected to the broker. + + * If the schema does not pass the compatibility check, then the consumer is rejected to connect to the broker. + +6. The consumer receives messages from the broker. + + If the schema used by the consumer supports schema versioning (for example, AVRO schema), the consumer fetches the `SchemaInfo` of the version tagged in messages and uses the passed-in schema and the schema tagged in messages to decode the messages. diff --git a/site2/website-next/versioned_sidebars/version-2.7.2-sidebars.json b/site2/website-next/versioned_sidebars/version-2.7.2-sidebars.json index 078728029e300..e8c9dbbc07207 100644 --- a/site2/website-next/versioned_sidebars/version-2.7.2-sidebars.json +++ b/site2/website-next/versioned_sidebars/version-2.7.2-sidebars.json @@ -63,6 +63,28 @@ "id": "version-2.7.2/concepts-multiple-advertised-listeners" } ] + }, + { + "type": "category", + "label": "Pulsar Schema", + "items": [ + { + "type": "doc", + "id": "version-2.7.2/schema-get-started" + }, + { + "type": "doc", + "id": "version-2.7.2/schema-understand" + }, + { + "type": "doc", + "id": "version-2.7.2/schema-evolution-compatibility" + }, + { + "type": "doc", + "id": "version-2.7.2/schema-manage" + } + ] } ] } \ No newline at end of file