Skip to content

PIP 74: Pulsar client memory limits

Matteo Merli edited this page Dec 15, 2020 · 2 revisions
  • Status: Proposal
  • Author: Matteo Merli
  • Pull Request:
  • Mailing List discussion:
  • Release:

Motivation

Currently, there are multiple settings in producers and consumers that allow to control the sizes of the message queues. These are ultimately used to control the amount of memory used by the Pulsar client.

There are few issues with the current approach, that makes it complicated, in non-trivial scenarios, to select an overall configuration that leads to a certain use of memory:

 1. The settings are in terms of "number of messages", so one has to adjust     based on the expected message size.

 2. The settings are per producer/consumer. If an application has a large     (or simply unknown) number number of producers/consumer, it's very difficult     to select an appropriate value for queue sizes. The same is true for topics     that have many partitions.

Goal

Allow to specify a maximum amount of memory on a given Pulsar client. The producers and consumers will compete for the memory assigned.

The scope of this proposal is to track the "direct" memory used to hold outgoing or incoming message payloads. The usage of JVM heap memory is outside the scope, though normally it only represents a small fraction of the payloads size.

The change will be done in multiple steps:  1. Introduce new API to set the memory limit, keeping it disabled by default   1.1 Implement memory limit for producers  2. Implement memory limit for consumers  3. (Once the feature is stable)     Enable memory limit by default, with 64 MB  4. Deprecate producer queue size related settings  5. Ignore producer queue size related settings

Client API

public class ClientBuilder {
  // ....

  /**
   * Configure a limit on the amount of direct memory that will be allocated by this client instance.
   * <p>
   * Setting this to 0 will disable the limit.
   *
   * @param memoryLimit
   *            the limit
   * @param unit
   *            the memory limit size unit
   * @return the client builder instance
   */
  ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);
}

For example:

PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .memoryLimit(64, SizeUnit.MEGA_BYTES)
    .create();

The same logic described here will be implemented first in Java and then in the rest of supported client libraries.

Implementation

The enforcement of the memory limit is done in a different way for producers as compared to consumers, although both will share a common instance of a MemoryLimitController, which is unique per each PulsarClient instance.

interface MemoryLimitController {

    // Non-blocking
    boolean tryReserveMemory(long size);

    // Blocks until memory is available
    void reserveMemory(long size) throws InterruptedException;

    void releaseMemory(long size);
}

The implementation of MemoryLimitController will be optimized to avoid contention across multiple threads trying to reserve and release memory.

Producer

If the memory limit is set, the producer will first try to acquire the semaphore that is currently set based on the producer queue size, then it will try to reserve the memory, either in blocking or non-blocking way, depending on the producer configuration.

Consumer

The logic for consumers is slightly more complicated because a client needs to give permits to brokers to push messages. Right now, the count is done in terms of messages and it cannot simply switch to "bytes" because a consumer will need to consume from multiple topics and partitions.

Because of that, we need to invert the order, by using the memory limit controller as a wait to interact with the flow control, reserving memory after we have already received the messages.

The proposal is to keep the receiver queue size as a configuration, although treat it more as the "max receiver queue size".

When a consumer is created, and memory limit is set, it will use these parameters:  * maxReceiverQueueSize: the values configured by the application  * currentReceiverQueue: a dynamic limit that will be auto adjusted, starting     from an initial value (eg: 1) up to the max.

The goal is to step up the currentReceiverQueue if and only if:  1. Doing it would improve throughput  2. We have enough memory to do it

Controlling throughput

The rationale for (1) is that increasing currentReceiverQueue increases the amount of pre-fetched messages for the consumer. The goal of the prefetch queue is to make sure an application calling receive() will never be blocked waiting, if there are messages available.

Therefore, we can determine that the currentReceiverQueue is limiting the throughput if:

 1. Application calls receive and there are no pre-fetched messages  2. And we know that there are messages pending to be sent for this consumer.     For this, in case of exclusive/failover subscriptions, we can rely on the     last message id, while for other subscription types, we need to introduce     using a new special command to brokers that will tell if broker waiting on     consumer for more permits.

If these conditions are true, we can increase the currentReceiverQueue to increase the load.

With this mechanism, we can start with a very small window and expand as needed, maintaining the minimum size that is able to sustain the requested throughput.

Limiting consumer memory

In order to limit the consumer memory, we would only increase the currentReceiverQueue if the memory usage is below a certain threshold (eg: 75%). Also, if the usage reaches a higher threshold, (eg: 95%) we will reduce the currentReceiverQueue for all the consumers.

Clone this wiki locally