Skip to content

PIP 41: Pluggable Protocol Handler

Sijie Guo edited this page Mar 2, 2020 · 4 revisions
  • Status: Accepted
  • Author: Jia Zhai, Sijie Guo
  • Pull Request:
  • Mailing List discussion:
  • Release: 2.5.0

Design Proposal:

Motivation

As a cloud-native messaging system, Pulsar is designed in a modern layered architecture. It provides a lot of elegant components such as load manager, namespace bundle distribution, topic lookup and the streaming storage abstraction. These components can be reused for supporting other messaging protocols such as Kafka, AMQP and MQTT protocols. In PIP-42: KoP - Kafka on Pulsar we will be implementing Kafka protocol natively in Pulsar broker. This PIP proposes introducing a pluggable protocol handler mechanism in Pulsar broker. So Pulsar broker can dynamically load additional protocol handlers on runtime and support other message protocols. This also allows developers to extend Pulsar capabilities to other messaging domains by leveraging all the benefits provided by Pulsar architecture.

Scope

It is almost impossible to define a perfect protocol handler interface for the first attempt. So this PIP mainly focuses on defining the lifecycle of protocol handlers. Hence it will provide direct access to all components in Pulsar BrokerService. We will NOT attempt to define a clear boundary what components that protocol handler can access and what components it can not. We will defer defining the boundary until we have used this interface to implement at least 2 messaging protocols, such as Kafka and MQTT.

Protocol Handler

The main interface of ProtocolHandler is defined as below:

/**
* The protocol handler interface for support additional protocols on Pulsar brokers.
*/
@Beta
public interface ProtocolHandler extends AutoCloseable {

   /**
    * Returns the unique protocol name. For example, `kafka-v2` for protocol handler for Kafka v2 protocol.
    */
   String protocolName();

   /**
    * Verify if the protocol can speak the given <tt>protocol</tt>.
    *
    * @param protocol the protocol to verify
    * @return true if the protocol handler can handle the given protocol, otherwise false.
    */
   boolean accept(String protocol);

   /**
    * Initialize the protocol handler when the protocol is constructed from reflection.
    *
    * @param conf broker service configuration
    * @throws Exception when fail to initialize the protocol handler.
    */
   void initialize(ServiceConfiguration conf) throws Exception;

   /**
    * Retrieve the protocol related data to advertise as part of
    * {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
    *
    * <p>For example, when implementing a Kafka protocol handler, you need to advertise
    * corresponding Kafka listeners so that Pulsar brokers understand how to give back
    * the listener information when handling metadata requests.
    *
    * @return the protocol related data to be advertised as part of LocalBrokerData.
    */
   String getProtocolDataToAdvertise();

   /**
    * Start the protocol handler with the provided broker service.
    *
    * <p>The broker service provides the accesses to the Pulsar components such as load
    * manager, namespace service, managed ledger and etc.
    *
    * @param service the broker service to start with.
    */
   void start(BrokerService service);

   /**
    * Create the list of channel initializers for the ports that this protocol handler
    * will listen on.
    *
    * @return the list of channel initializers for the ports that this protocol handler listens on.
    */
   Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers();

   @Override
   void close();
}

A protocol handler will enter following lifecycle phases one-by-one:

INITIALIZE ADVERTISE START STOP

INITIALIZE

A protocol handler will be constructed using ServiceLoader. So an implementation of protocol handler should provide a default no-args constructor. After a protocol handler is constructed, it will be tested to see if it can accept a given protocol via #accept(String) method.

If a protocol handler can accept the given protocol, the protocol handler will be picked as a handler to be loaded by the Pulsar broker. Pulsar broker will then initialize the handler using #initialize(ServiceConfiguration) method.

The implementation of #initialize should initialize the handler with the provided configuration.

ADVERTISE

In a lot of messaging protocols, a broker needs to advertise its listening ports and protocols to a service discovery service so that the messaging clients know how to connect to it correctly.

So the implementation of a protocol handler needs to implement getProtocolDataToAdvertise to return its protocol specific data to advertise to Pulsar’s topic discovery system. So Pulsar brokers and the clients can use this protocol-specific data to learn how to connect to the brokers. We will explain how do we store the protocol-specific data in Pulsar’s topic discovery system.

In KoP’s implementation, we advertise Kafka’s listeners as kafka protocol data. So Kafka protocol handler can return the correct Kafka broker listener information when handling topic metadata request.

START

In the START phase, Pulsar broker starts a protocol handler via #start(BrokerService). The implementation of a protocol handler can setup the resources required for the protocol handler and it get access to Pulsar components via the pass-in BrokerService instance.

Once the protocol handler is started, Pulsar broker calls newChannelInitializers() to return the list of channel initializers that this protocol implemented. The returned value is a map from listening address to the actual channel initializer. Because a messaging protocol might be listening on multiple different ports for different security protocols.

STOP

Pulsar broker calls #close() to shutdown a protocol handler. The implementation should clean up the resources used by itself.

Configuration

messagingProtocols is a newly introduced setting in Pulsar broker for configuring the list of protocols that Pulsar broker will load in addition to Pulsar protocol handler.

The ProtocolHandler will be loaded by using ServiceLoader. So Pulsar broker will get a list of candidate protocol handler when it starts up. Pulsar broker will test all the candidate protocol handlers to see if they support one of the messaging protocols configured in messagingProtocols. It only initializes the protocol handlers for the configured protocols.

For example, if ServiceLoader finds a MQTT protocol handler and a Kafka protocol handler. But the broker is configured to use Kafka handler. Pulsar broker will only initialize the Kafka handler.

Protocol Data

We introduced a new field protocols in LocalBrokerData for storing protocol-specific data for different registered protocols.

private Map<String, String> protocols;

The key of the map is the protocol name. The value of the map is an opaque string for storing the protocol-specific data. Pulsar doesn’t need to know how to interpret the value. Only the protocol implementation knows how to interpret the value.

Compatibility, Deprecation, and Migration Plan

This change proposes introducing a new protocol handler mechanism in Pulsar. It doesn’t touch any existing wire protocol or storage formats. So there is no compatibility, deprecation or migration plan for existing Pulsar applications.

Existing tests cover code changes related to refactor.

Test Plan

Existing tests cover code changes related to refactor. New tests will be added for the pluggable protocol handler.

Reject Alternatives

Messaging Gateway

Issue #2556 attempts to introduce a messaging gateway / proxy to support different messaging protocols. The initiative is similar to the proposal here. However the proposal is proposing adding the support in the proxy layer. There are a few drawbacks of doing this in gateway/proxy layer. Firstly, it doesn’t leverage the advantages that the streaming storage layer already provides (i.e fencing, cursor management) and make things pretty difficult on supporting protocols like Kafka protocol.; Secondly, it introduces additional overhead (i.e network bandwidth) which is not suitable for high-volume streaming workloads.; Lastly, it also introduces the complexity of managing the system.

Clone this wiki locally