Skip to content

Commit

Permalink
commit the first part
Browse files Browse the repository at this point in the history
  • Loading branch information
Yan Zhang committed Nov 17, 2021
1 parent 7a34ceb commit feca320
Show file tree
Hide file tree
Showing 23 changed files with 3,608 additions and 0 deletions.
4 changes: 4 additions & 0 deletions site2/website-next/docusaurus.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ module.exports = {
label: "2.6.3",
to: "docs/2.6.3/",
},
{
label: "2.6.2",
to: "docs/2.6.2/",
},
{
label: "2.2.0",
to: "docs/2.2.0/",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
---
id: concepts-architecture-overview
title: Architecture Overview
sidebar_label: "Architecture"
original_id: concepts-architecture-overview
---

At the highest level, a Pulsar instance is composed of one or more Pulsar clusters. Clusters within an instance can [replicate](concepts-replication) data amongst themselves.

In a Pulsar cluster:

* One or more brokers handles and load balances incoming messages from producers, dispatches messages to consumers, communicates with the Pulsar configuration store to handle various coordination tasks, stores messages in BookKeeper instances (aka bookies), relies on a cluster-specific ZooKeeper cluster for certain tasks, and more.
* A BookKeeper cluster consisting of one or more bookies handles [persistent storage](#persistent-storage) of messages.
* A ZooKeeper cluster specific to that cluster handles coordination tasks between Pulsar clusters.

The diagram below provides an illustration of a Pulsar cluster:

![Pulsar architecture diagram](/assets/pulsar-system-architecture.png)

At the broader instance level, an instance-wide ZooKeeper cluster called the configuration store handles coordination tasks involving multiple clusters, for example [geo-replication](concepts-replication).

## Brokers

The Pulsar message broker is a stateless component that's primarily responsible for running two other components:

* An HTTP server that exposes a {@inject: rest:REST:/} API for both administrative tasks and [topic lookup](concepts-clients.md#client-setup-phase) for producers and consumers
* A dispatcher, which is an asynchronous TCP server over a custom [binary protocol](developing-binary-protocol) used for all data transfers

Messages are typically dispatched out of a [managed ledger](#managed-ledgers) cache for the sake of performance, *unless* the backlog exceeds the cache size. If the backlog grows too large for the cache, the broker will start reading entries from BookKeeper.

Finally, to support geo-replication on global topics, the broker manages replicators that tail the entries published in the local region and republish them to the remote region using the Pulsar [Java client library](client-libraries-java).

> For a guide to managing Pulsar brokers, see the [brokers](admin-api-brokers) guide.
## Clusters

A Pulsar instance consists of one or more Pulsar *clusters*. Clusters, in turn, consist of:

* One or more Pulsar [brokers](#brokers)
* A ZooKeeper quorum used for cluster-level configuration and coordination
* An ensemble of bookies used for [persistent storage](#persistent-storage) of messages

Clusters can replicate amongst themselves using [geo-replication](concepts-replication).

> For a guide to managing Pulsar clusters, see the [clusters](admin-api-clusters) guide.
## Metadata store

Pulsar uses [Apache Zookeeper](https://zookeeper.apache.org/) for metadata storage, cluster configuration, and coordination. In a Pulsar instance:

* A configuration store quorum stores configuration for tenants, namespaces, and other entities that need to be globally consistent.
* Each cluster has its own local ZooKeeper ensemble that stores cluster-specific configuration and coordination such as ownership metadata, broker load reports, BookKeeper ledger metadata, and more.

## Persistent storage

Pulsar provides guaranteed message delivery for applications. If a message successfully reaches a Pulsar broker, it will be delivered to its intended target.

This guarantee requires that non-acknowledged messages are stored in a durable manner until they can be delivered to and acknowledged by consumers. This mode of messaging is commonly called *persistent messaging*. In Pulsar, N copies of all messages are stored and synced on disk, for example 4 copies across two servers with mirrored [RAID](https://en.wikipedia.org/wiki/RAID) volumes on each server.

### Apache BookKeeper

Pulsar uses a system called [Apache BookKeeper](http://bookkeeper.apache.org/) for persistent message storage. BookKeeper is a distributed [write-ahead log](https://en.wikipedia.org/wiki/Write-ahead_logging) (WAL) system that provides a number of crucial advantages for Pulsar:

* It enables Pulsar to utilize many independent logs, called [ledgers](#ledgers). Multiple ledgers can be created for topics over time.
* It offers very efficient storage for sequential data that handles entry replication.
* It guarantees read consistency of ledgers in the presence of various system failures.
* It offers even distribution of I/O across bookies.
* It's horizontally scalable in both capacity and throughput. Capacity can be immediately increased by adding more bookies to a cluster.
* Bookies are designed to handle thousands of ledgers with concurrent reads and writes. By using multiple disk devices---one for journal and another for general storage--bookies are able to isolate the effects of read operations from the latency of ongoing write operations.

In addition to message data, *cursors* are also persistently stored in BookKeeper. Cursors are [subscription](reference-terminology.md#subscription) positions for [consumers](reference-terminology.md#consumer). BookKeeper enables Pulsar to store consumer position in a scalable fashion.

At the moment, Pulsar supports persistent message storage. This accounts for the `persistent` in all topic names. Here's an example:

```http
persistent://my-tenant/my-namespace/my-topic
```

> Pulsar also supports ephemeral ([non-persistent](concepts-messaging.md#non-persistent-topics)) message storage.

You can see an illustration of how brokers and bookies interact in the diagram below:

![Brokers and bookies](/assets/broker-bookie.png)


### Ledgers

A ledger is an append-only data structure with a single writer that is assigned to multiple BookKeeper storage nodes, or bookies. Ledger entries are replicated to multiple bookies. Ledgers themselves have very simple semantics:

* A Pulsar broker can create a ledger, append entries to the ledger, and close the ledger.
* After the ledger has been closed---either explicitly or because the writer process crashed---it can then be opened only in read-only mode.
* Finally, when entries in the ledger are no longer needed, the whole ledger can be deleted from the system (across all bookies).

#### Ledger read consistency

The main strength of Bookkeeper is that it guarantees read consistency in ledgers in the presence of failures. Since the ledger can only be written to by a single process, that process is free to append entries very efficiently, without need to obtain consensus. After a failure, the ledger will go through a recovery process that will finalize the state of the ledger and establish which entry was last committed to the log. After that point, all readers of the ledger are guaranteed to see the exact same content.

#### Managed ledgers

Given that Bookkeeper ledgers provide a single log abstraction, a library was developed on top of the ledger called the *managed ledger* that represents the storage layer for a single topic. A managed ledger represents the abstraction of a stream of messages with a single writer that keeps appending at the end of the stream and multiple cursors that are consuming the stream, each with its own associated position.

Internally, a single managed ledger uses multiple BookKeeper ledgers to store the data. There are two reasons to have multiple ledgers:

1. After a failure, a ledger is no longer writable and a new one needs to be created.
2. A ledger can be deleted when all cursors have consumed the messages it contains. This allows for periodic rollover of ledgers.

### Journal storage

In BookKeeper, *journal* files contain BookKeeper transaction logs. Before making an update to a [ledger](#ledgers), a bookie needs to ensure that a transaction describing the update is written to persistent (non-volatile) storage. A new journal file is created once the bookie starts or the older journal file reaches the journal file size threshold (configured using the [`journalMaxSizeMB`](reference-configuration.md#bookkeeper-journalMaxSizeMB) parameter).

## Pulsar proxy

One way for Pulsar clients to interact with a Pulsar [cluster](#clusters) is by connecting to Pulsar message [brokers](#brokers) directly. In some cases, however, this kind of direct connection is either infeasible or undesirable because the client doesn't have direct access to broker addresses. If you're running Pulsar in a cloud environment or on [Kubernetes](https://kubernetes.io) or an analogous platform, for example, then direct client connections to brokers are likely not possible.

The **Pulsar proxy** provides a solution to this problem by acting as a single gateway for all of the brokers in a cluster. If you run the Pulsar proxy (which, again, is optional), all client connections with the Pulsar cluster will flow through the proxy rather than communicating with brokers.

> For the sake of performance and fault tolerance, you can run as many instances of the Pulsar proxy as you'd like.
Architecturally, the Pulsar proxy gets all the information it requires from ZooKeeper. When starting the proxy on a machine, you only need to provide ZooKeeper connection strings for the cluster-specific and instance-wide configuration store clusters. Here's an example:

```bash

$ bin/pulsar proxy \
--zookeeper-servers zk-0,zk-1,zk-2 \
--configuration-store-servers zk-0,zk-1,zk-2

```

> #### Pulsar proxy docs
> For documentation on using the Pulsar proxy, see the [Pulsar proxy admin documentation](administration-proxy).

Some important things to know about the Pulsar proxy:

* Connecting clients don't need to provide *any* specific configuration to use the Pulsar proxy. You won't need to update the client configuration for existing applications beyond updating the IP used for the service URL (for example if you're running a load balancer over the Pulsar proxy).
* [TLS encryption](security-tls-transport.md) and [authentication](security-tls-authentication) is supported by the Pulsar proxy

## Service discovery

[Clients](getting-started-clients) connecting to Pulsar brokers need to be able to communicate with an entire Pulsar instance using a single URL. Pulsar provides a built-in service discovery mechanism that you can set up using the instructions in the [Deploying a Pulsar instance](deploy-bare-metal.md#service-discovery-setup) guide.

You can use your own service discovery system if you'd like. If you use your own system, there is just one requirement: when a client performs an HTTP request to an endpoint, such as `http://pulsar.us-west.example.com:8080`, the client needs to be redirected to *some* active broker in the desired cluster, whether via DNS, an HTTP or IP redirect, or some other means.

The diagram below illustrates Pulsar service discovery:

![alt-text](/assets/pulsar-service-discovery.png)

In this diagram, the Pulsar cluster is addressable via a single DNS name: `pulsar-cluster.acme.com`. A [Python client](client-libraries-python), for example, could access this Pulsar cluster like this:

```python

from pulsar import Client

client = Client('pulsar://pulsar-cluster.acme.com:6650')

```

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
id: concepts-authentication
title: Authentication and Authorization
sidebar_label: "Authentication and Authorization"
original_id: concepts-authentication
---

Pulsar supports a pluggable [authentication](security-overview) mechanism which can be configured at broker and it also supports authorization to identify client and its access rights on topics and tenants.

Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
---
id: concepts-clients
title: Pulsar Clients
sidebar_label: "Clients"
original_id: concepts-clients
---

Pulsar exposes a client API with language bindings for [Java](client-libraries-java.md), [Go](client-libraries-go.md), [Python](client-libraries-python.md), [C++](client-libraries-cpp.md) and [C#](client-libraries-dotnet). The client API optimizes and encapsulates Pulsar's client-broker communication protocol and exposes a simple and intuitive API for use by applications.

Under the hood, the current official Pulsar client libraries support transparent reconnection and/or connection failover to brokers, queuing of messages until acknowledged by the broker, and heuristics such as connection retries with backoff.

> #### Custom client libraries
> If you'd like to create your own client library, we recommend consulting the documentation on Pulsar's custom [binary protocol](developing-binary-protocol)

## Client setup phase

When an application wants to create a producer/consumer, the Pulsar client library will initiate a setup phase that is composed of two steps:

1. The client will attempt to determine the owner of the topic by sending an HTTP lookup request to the broker. The request could reach one of the active brokers which, by looking at the (cached) zookeeper metadata will know who is serving the topic or, in case nobody is serving it, will try to assign it to the least loaded broker.
1. Once the client library has the broker address, it will create a TCP connection (or reuse an existing connection from the pool) and authenticate it. Within this connection, client and broker exchange binary commands from a custom protocol. At this point the client will send a command to create producer/consumer to the broker, which will comply after having validated the authorization policy.

Whenever the TCP connection breaks, the client will immediately re-initiate this setup phase and will keep trying with exponential backoff to re-establish the producer or consumer until the operation succeeds.

## Reader interface

In Pulsar, the "standard" [consumer interface](concepts-messaging.md#consumers) involves using consumers to listen on [topics](reference-terminology.md#topic), process incoming messages, and finally acknowledge those messages when they've been processed. Whenever a new subscription is created, it is initially positioned at the end of the topic (by default), and consumers associated with that subscription will begin reading with the first message created afterwards. Whenever a consumer connects to a topic using a pre-existing subscription, it begins reading from the earliest message un-acked within that subscription. In summary, with the consumer interface, subscription cursors are automatically managed by Pulsar in response to [message acknowledgements](concepts-messaging.md#acknowledgement).

The **reader interface** for Pulsar enables applications to manually manage cursors. When you use a reader to connect to a topic---rather than a consumer---you need to specify *which* message the reader begins reading from when it connects to a topic. When connecting to a topic, the reader interface enables you to begin with:

* The **earliest** available message in the topic
* The **latest** available message in the topic
* Some other message between the earliest and the latest. If you select this option, you'll need to explicitly provide a message ID. Your application will be responsible for "knowing" this message ID in advance, perhaps fetching it from a persistent data store or cache.

The reader interface is helpful for use cases like using Pulsar to provide effectively-once processing semantics for a stream processing system. For this use case, it's essential that the stream processing system be able to "rewind" topics to a specific message and begin reading there. The reader interface provides Pulsar clients with the low-level abstraction necessary to "manually position" themselves within a topic.

Internally, the reader interface is implemented as a consumer using an exclusive, non-durable subscription to the topic with a randomly-allocated name.

[ **IMPORTANT** ]

Unlike subscription/consumer, readers are non-durable in nature and will not prevent data in a topic from being deleted, thus it is ***strongly*** advised that [data retention](cookbooks-retention-expiry) be configured. If data retention for a topic is not configured for an adequate amount of time, messages that the reader has not yet read might be deleted . This will cause readers to essentially skip messages. Configuring the data retention for a topic guarantees the reader with have a certain duration to read a message.

Please also note that a reader can have a "backlog", but the metric is just to allow users to know how behind the reader is and is not considered for any backlog quota calculations.

![The Pulsar consumer and reader interfaces](/assets/pulsar-reader-consumer-interfaces.png)

> ### Non-partitioned topics only
> The reader interface for Pulsar cannot currently be used with [partitioned topics](concepts-messaging.md#partitioned-topics).
Here's a Java example that begins reading from the earliest available message on a topic:

```java

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;

// Create a reader on a topic and for a specific message (and onward)
Reader<byte[]> reader = pulsarClient.newReader()
.topic("reader-api-test")
.startMessageId(MessageId.earliest)
.create();

while (true) {
Message message = reader.readNext();

// Process the message
}

```

To create a reader that will read from the latest available message:

```java

Reader<byte[]> reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(MessageId.latest)
.create();

```

To create a reader that will read from some message between earliest and latest:

```java

byte[] msgIdBytes = // Some byte array
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader<byte[]> reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(id)
.create();

```

0 comments on commit feca320

Please sign in to comment.