Skip to content

PIP 66: Pulsar Function Mesh

Sijie Guo edited this page Jun 16, 2020 · 3 revisions
  • Status: Proposal
  • Author: Neng Lu
  • Pull Request:
  • Mailing List discussion:
  • Release:


Pulsar Functions has been a convenient tool for simple real-time message processing scenario. It allows users to implement business logic with a minimum interface and then submit directly to the existing Pulsar cluster to execute. The simplicity comes in two folds: 1. Simple Interface; 2. Simple Deployment. People don't need to learn a complicated bundle of new interfaces in order to express their simple jobs. And they also don't need to set up and maintain a new stream processing infrastructure cluster in order to run the functions.

As the functions have been used for some time, we realized that the native support of allowing multiple functions to be organized together is demanding. With support, people can express and manage multi-stage jobs easily. In addition, this support also provides the possibility of higher-level abstraction DSL to further simplify the job composition. We call this new feature -- Pulsar Function Mesh.

In the following sections, we will discuss the design of the Pulsar Function Mesh.


The Pulsar Function Mesh reuses the existing Pulsar Function as the fundamental computing unit. Based on the existing function runtimes, we need to add/make changes in the following parts in order to provide the support for Function Mesh:

  1. A function-mesh definition which allows user to express a mesh
  2. Pulsar-admin client tool accepts user operations on mesh
  3. Pulsar-broker handles the incoming mesh-related request
  4. Function-mesh runtime manager manages submitted mesh and interactions with function runtime

These changes require minimum development work while enable users to manage function meshes within the existing Pulsar cluster.

Mesh Definition

For the definition, the user provides a YAML file to express the composition of the Mesh. The major fields are listed in the following example file:

# Metadata 
name: PIP_Mesh
namespace: PIP_Namespace
tenant: PIP_Tenant

# Function Mesh configs
jarFile: /local/jar/files/example.jar

# Functions
  - name: Func1
    classname: org.apache.pulsar.functions.api.examples.ExclamationFunction
    replicas: 1
    - pulsar_topic_sourcce
      - pulsar_topic_1
  - name: Func2
    classname: org.apache.pulsar.functions.api.examples.ExclamationFunction
    replicas: 1
    - pulsar_topic_1
      - pulsar_topic_result		

The above YAML file describes the following Function Mesh, in which, the Func_1 reads original data from pulsar_topic_source and processes it. After the processing, data is pushed to another pulsar topic pulsar_topic_1. Func_2 then process the data and push the final result into pulsar_topic_sink for others to access. The topology of this Function Mesh is demonstrated as follows:

Source_Topic ------> Func_1 ------> Topic_1 ------> Func_2 ------> Sink_Topic

One thing that needs attention is that each Pulsar Function Mesh is self-contained and should not reference a function unit instance in other's Function Mesh. This ensures the complexity of a job or the whole jobs across the entire namespace is limited to a certain extent. And if there's a need to interact with other mesh, it's good to just create a new mesh that consumes pulsar topics generated by others.

CLI and Endpoint

The pulsar-admin client tool will be added with the new subcommand to allow user to submit Function Mesh to the Pulsar cluster. For minimum usage, these three create/delete/list mesh commands are needed. And more commands can be added as needed.

bin/pulsar-admin function-mesh create -f mesh.yaml
bin/pulsar-admin function-mesh delete --tenant default --namespace public --name pip-mesh
bin/pulsar-admin function-mesh list --tenant default --namespace public

Corresponding to the added CLI command, the HTTP admin endpoint also need to handle these requests accordingly. And more endpoint API could be added as needed.

 public void registerFunctionMesh(...)
 public void deregisterFunctionMesh(...)
 public List<String> listFunctionMesh(...)


We introduce a FunctionMeshManager in the Function Worker to manage the metadata of a Function Mesh. And for actually running functions inside the mesh, we largely reuse the existing functions runtime to execute the actual function unit. The architecture is as follows:

![image-20200615195438887](/Users/nlu/Library/Application Support/typora-user-images/image-20200615195438887.png)

The FunctionMeshManager is the entry point for a submitted mesh and mainly responsible for the following tasks:

  1. Handling the submitted jar files.
  2. Verifying the mesh is valid - functions's calss are available in the jar, the structure of mesh is correct, no duplicated named meshes, etc.
  3. Generate individual functions in a mesh and submit them into the Function submission topic.
  4. Manage the mesh based on the user's operation


One optimization for Function Mesh is to group several functions together in one FunctionRunner process in order to reduce the latency as well as the internal pulsar topics used. With this optimization, the FunctionRunner still has one source and one sink, but a list of functions to execute together sequentially.

Clone this wiki locally