Skip to content

PIP 100: Pulsar pluggable topic factory

Rajan Dhabalia edited this page Sep 29, 2021 · 2 revisions

Introduction

In pulsar, the topic is a key entity that is used to connect producers and consumers. The topic interface provides API to publish (persist) and dispatch messages at Broker. The broker provides two default implementations of the topic: persistent topic and non-persistent topic. As the name suggests, a persistent topic persists every message and provides durability whereas a non-persistent topic doesn’t persist message and does not provide durability.

Users select the topic type based on the application use case and requirement of topic behavior. However, in some circumstances, users need some additional behavior on top of the existing implementation and even would like to inject custom workflow in existing topic behavior. Such special circumstances are mostly needed when users would like to do smooth migrations of topics or pulsar clusters without impacting producer and consumer applications. In such scenarios, users can override publish or dispatch behavior of the topic and plug in the additional workflow. For example: perform dual write on multiple topics while migration or, skip messages published from the specific source without explicit publish failures, ignore specific subscription source without generating a client-side error, or without impacting client applications. This feature will be useful for any kind of migration where the pulsar cluster is serving live topics and require custom topic level treatment for flawless server-side migration and without impacting client application especially legacy applications which are hard to change.

Proposal

Broker side changes

In order to create a custom topic, the broker will allow a pluggable Topic creation factory which can be implemented by the user to create custom topics. The topic must be a type of Persistent/Non-persistent topic else the broker will fall back to the default implementation.

Configuration Interface definition

Broker configuration

private String topicFactoryClassName;

Topic factory class definition

/**
 * Pluggable TopicFactory to create a custom topic with specific behavior in the broker.
 */
public interface TopicFactory {

    <T extends Topic> T create(String topic, ManagedLedger ledger, BrokerService brokerService, Class<T> topicClazz);
}

Sample Usage

Example 1: Real-time topic or data migration

The below example shows that you can implement Custom Topic Factory which creates a Topic that can perform dual-write to achieve real-time topic migration or data migration to another pulsar cluster without any client-side changes. It serves a similar purpose as PIP-95.

public static class CustomPersistentTopicFactory implements TopicFactory {

        @Override
        public <T extends Topic> T create(String topic, ManagedLedger ledger, BrokerService brokerService,
                Class<T> topicClazz) {
            try {
                return (T) new DualWriteTopic(topic, brokerService, ledger);
            } catch (NamingException e) {
                throw new IllegalStateException(e);
            }
        }

       public static class DualWriteTopic extends PersistentTopic {
            DualWriteTopic(String topic, BrokerService brokerService, ManagedLedger ledger) throws NamingException {
                super(topic, ledger, brokerService);
            }
            @Override
            public void publishMessage(ByteBuf headersAndPayload, PublishContext callback) {
                // (1) write to new pulsar-cluster or topic
                dualWrite(headersAndPayload);
                // call default implementation
                super.publishMessage(headersAndPayload, callback);
            } 
        }        
}

Example 2: Skip published messages from a specific source without client failure

The below example shows that Topic Factory can create a custom topic that can skip messages from a specific client source without actually failing the messages but skipping the messages to persist into a disk.

@Override
public void publishMessage(ByteBuf headersAndPayload, PublishContext callback) {
    String roleName = this.getProducers().get(callback.getProducerName()).getCnx().getAuthRole();
    // ignore message from specific source without failing the message
    if (skipMessage(roleName)) {
        callback.completed(null, 0L, 0L);
        return;
    }
    // call default implementation
    super.publishMessage(headersAndPayload, callback);
} 

This feature opens a door for users to implement different types of mechanisms to inject custom topic behaviors in process of any kind of migration

Client-side changes:

This feature does not require any client-side changes.

Note:

Experimental feature:

This feature and Topic-factory interface will be considered as an experimental feature and the Topic-Factory interface definition can be changed in a future release.

Clone this wiki locally