Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #11351] Parallel Precise Publish Rate Limiting Fix #11372

Merged

Conversation

ronfarkash
Copy link
Contributor

@ronfarkash ronfarkash commented Jul 19, 2021

Master Issue: #11351

Motivation

Hello, as far as I'm concerned it is well known that precise publish rate limiting does not function well. I believe my PR fixes problem number 3 stated in the issue above.

@danielsinai:
"3. Rate limit function passed only to the msg/s rate limiter (and that's in order to avoid calling it twice)"

It was passed to message rate limiter only due to the fact that there was no implementation of a way to throttle the connection whenever only one of the limiters was exceeded.

This PR will allow both message rate & byte rate to co-exist, limit and enable socket reading only when necessary.

Modifications

  • tryAcquire function in PublishRateLimiterDisable will return true. If publish rate was null, this function would get called and return false, thus throttling the client for no reason. If the publish rate is null, it means it was not set by anyone so there's no reason to throttle any connection.
public boolean tryAcquire(int numbers, long bytes) {
        return true;
}
  • RateLimiter permits and acquiredPermits were changed to volatile.
 private volatile long permits;
 private volatile long acquiredPermits;

in order to allow reading access from multiple threads at the same time.
also the removal of synchronized keyword from getAvailablePermits() function.

public long getAvailablePermits() {
        return Math.max(0, this.permits - this.acquiredPermits);
}

This is required, since a thread dead lock will happen if not.

  • Created a HashMap to manage the byte and message rate limiters, and a function releaseThrottle() to handle the auto read enable.
    If one of the rate limiters has no available permits we will not re-enable the auto read from the socket.

Verifying this change

This change is already covered by existing tests, such as PrecisRateLimiterTest.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): no
  • The public API: no
  • The schema: no
  • The default values of configurations: no
  • The wire protocol: no
  • The rest endpoints: no
  • The admin cli options: no
  • Anything that affects deployment: no

Documentation

For contributor

For this PR, do we need to update docs?

No, this PR fixes bugs of existing documented features.

Important Additional Information

This PR fixes some core issues with precise publish rate limiting but is depdenent on another PR #11446 , I would highly prefer @danielsinai PR to be merged first before this one since it fixes core issues regarding publish rate limiting and in order to prevent unnecessary disfunctionallities.

@lhotari also has a PR in the works fixing other issues related to the same topic #10384.

@ronfarkash ronfarkash changed the title [ISSUE 11351] Parallel Precise Publish Rate Limiting Fix [Issue 11351] Parallel Precise Publish Rate Limiting Fix Jul 19, 2021
@ronfarkash ronfarkash changed the title [Issue 11351] Parallel Precise Publish Rate Limiting Fix [Issue #11351] Parallel Precise Publish Rate Limiting Fix Jul 19, 2021
@Anonymitaet Anonymitaet added the doc-not-needed Your PR changes do not impact docs label Jul 20, 2021
@lhotari lhotari requested review from sijie and eolivelli July 23, 2021 13:52
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Good work @ronfarkash . Thanks for the contribution.

@ronfarkash
Copy link
Contributor Author

/pulsarbot run-failure-checks

@ronfarkash
Copy link
Contributor Author

It seems that every time I run it, other tests fail. I believe my changes didn't hurt anything since the tests that are failing are not related to my changes.

@lhotari
Copy link
Member

lhotari commented Jul 25, 2021

It seems that every time I run it, other tests fail. I believe my changes didn't hurt anything since the tests that are failing are not related to my changes.

Yes, there are quite a few flaky tests. Please report the flaky tests as GitHub issues up unless they have already been reported.

@sijie sijie added this to the 2.9.0 milestone Jul 28, 2021
@codelipenghui
Copy link
Contributor

Error:  Tests run: 69, Failures: 1, Errors: 0, Skipped: 51, Time elapsed: 118.967 s <<< FAILURE! - in org.apache.pulsar.broker.service.persistent.PersistentTopicStreamingDispatcherE2ETest
  Error:  testMessageExpiryWithTopicMessageTTL(org.apache.pulsar.broker.service.persistent.PersistentTopicStreamingDispatcherE2ETest)  Time elapsed: 21.188 s  <<< FAILURE!
  java.lang.AssertionError: expected [500] but found [409]
  	at org.testng.Assert.fail(Assert.java:99)
  	at org.testng.Assert.failNotEquals(Assert.java:1037)
  	at org.testng.Assert.assertEqualsImpl(Assert.java:140)
  	at org.testng.Assert.assertEquals(Assert.java:122)
  	at org.testng.Assert.assertEquals(Assert.java:907)
  	at org.testng.Assert.assertEquals(Assert.java:917)
  	at org.apache.pulsar.broker.service.PersistentTopicE2ETest.testMessageExpiryWithTopicMessageTTL(PersistentTopicE2ETest.java:1008)
  	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
  	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
  	at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:45)
  	at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:73)
  	at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
  	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  	at java.base/java.lang.Thread.run(Thread.java:829)

@ronfarkash
Copy link
Contributor Author

Is there anything else need to be done, does this test prevent this PR from being merged even though it's unrelated to my changes?

@sijie sijie merged commit 7f2ca8f into apache:master Jul 29, 2021
codelipenghui pushed a commit that referenced this pull request Jul 30, 2021
## Master Issue: <#11351>

### Motivation
Hello, as far as I'm concerned it is well known that precise publish rate limiting does not function well. I believe my PR fixes problem number 3 stated in the issue above.

@danielsinai:
"3. Rate limit function passed only to the msg/s rate limiter (and that's in order to avoid calling it twice)"

It was passed to message rate limiter only due to the fact that there was no implementation of a way to throttle the connection whenever only **one of the limiters was exceeded**.

This PR will allow both message rate & byte rate to co-exist, limit and enable socket reading only when necessary.

### Modifications

- _tryAcquire_ function in **PublishRateLimiterDisable** will return true. If publish rate was null, this function would get called and return false, thus throttling the client for no reason. If the publish rate is null, it means it was not set by anyone so there's no reason to throttle any connection. 
```java
public boolean tryAcquire(int numbers, long bytes) {
        return true;
}
```
- **RateLimiter** _permits_ and _acquiredPermits_ were changed to volatile.
```java
 private volatile long permits;
 private volatile long acquiredPermits;
```
in order to allow reading access from multiple threads at the same time.
also the removal of _synchronized_ keyword from _getAvailablePermits()_ function.
```java
public long getAvailablePermits() {
        return Math.max(0, this.permits - this.acquiredPermits);
}
```
**This is required, since a thread dead lock will happen if not.**

- Created ~a HashMap to manage the byte and message rate limiters, and~ a function _releaseThrottle()_ to handle the auto read enable.
If one of the rate limiters has no available permits we will not re-enable the auto read from the socket.

(cherry picked from commit 7f2ca8f)
@codelipenghui codelipenghui added the cherry-picked/branch-2.8 Archived: 2.8 is end of life label Jul 30, 2021
michaeljmarshall pushed a commit that referenced this pull request Dec 10, 2021
## Master Issue: <#11351>

### Motivation
Hello, as far as I'm concerned it is well known that precise publish rate limiting does not function well. I believe my PR fixes problem number 3 stated in the issue above.

@danielsinai:
"3. Rate limit function passed only to the msg/s rate limiter (and that's in order to avoid calling it twice)"

It was passed to message rate limiter only due to the fact that there was no implementation of a way to throttle the connection whenever only **one of the limiters was exceeded**.

This PR will allow both message rate & byte rate to co-exist, limit and enable socket reading only when necessary.

### Modifications

- _tryAcquire_ function in **PublishRateLimiterDisable** will return true. If publish rate was null, this function would get called and return false, thus throttling the client for no reason. If the publish rate is null, it means it was not set by anyone so there's no reason to throttle any connection.
```java
public boolean tryAcquire(int numbers, long bytes) {
        return true;
}
```
- **RateLimiter** _permits_ and _acquiredPermits_ were changed to volatile.
```java
 private volatile long permits;
 private volatile long acquiredPermits;
```
in order to allow reading access from multiple threads at the same time.
also the removal of _synchronized_ keyword from _getAvailablePermits()_ function.
```java
public long getAvailablePermits() {
        return Math.max(0, this.permits - this.acquiredPermits);
}
```
**This is required, since a thread dead lock will happen if not.**

- Created ~a HashMap to manage the byte and message rate limiters, and~ a function _releaseThrottle()_ to handle the auto read enable.
If one of the rate limiters has no available permits we will not re-enable the auto read from the socket.

(cherry picked from commit 7f2ca8f)
@michaeljmarshall michaeljmarshall added the cherry-picked/branch-2.7 Archived: 2.7 is end of life label Dec 10, 2021
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Jan 26, 2022
…he#11372)

Hello, as far as I'm concerned it is well known that precise publish rate limiting does not function well. I believe my PR fixes problem number 3 stated in the issue above.

@danielsinai:
"3. Rate limit function passed only to the msg/s rate limiter (and that's in order to avoid calling it twice)"

It was passed to message rate limiter only due to the fact that there was no implementation of a way to throttle the connection whenever only **one of the limiters was exceeded**.

This PR will allow both message rate & byte rate to co-exist, limit and enable socket reading only when necessary.

- _tryAcquire_ function in **PublishRateLimiterDisable** will return true. If publish rate was null, this function would get called and return false, thus throttling the client for no reason. If the publish rate is null, it means it was not set by anyone so there's no reason to throttle any connection.
```java
public boolean tryAcquire(int numbers, long bytes) {
        return true;
}
```
- **RateLimiter** _permits_ and _acquiredPermits_ were changed to volatile.
```java
 private volatile long permits;
 private volatile long acquiredPermits;
```
in order to allow reading access from multiple threads at the same time.
also the removal of _synchronized_ keyword from _getAvailablePermits()_ function.
```java
public long getAvailablePermits() {
        return Math.max(0, this.permits - this.acquiredPermits);
}
```
**This is required, since a thread dead lock will happen if not.**

- Created ~a HashMap to manage the byte and message rate limiters, and~ a function _releaseThrottle()_ to handle the auto read enable.
If one of the rate limiters has no available permits we will not re-enable the auto read from the socket.

(cherry picked from commit 7f2ca8f)
(cherry picked from commit ab5fb72)
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
…he#11372)

## Master Issue: <apache#11351>

### Motivation
Hello, as far as I'm concerned it is well known that precise publish rate limiting does not function well. I believe my PR fixes problem number 3 stated in the issue above.

@danielsinai:
"3. Rate limit function passed only to the msg/s rate limiter (and that's in order to avoid calling it twice)"

It was passed to message rate limiter only due to the fact that there was no implementation of a way to throttle the connection whenever only **one of the limiters was exceeded**.

This PR will allow both message rate & byte rate to co-exist, limit and enable socket reading only when necessary.

### Modifications

- _tryAcquire_ function in **PublishRateLimiterDisable** will return true. If publish rate was null, this function would get called and return false, thus throttling the client for no reason. If the publish rate is null, it means it was not set by anyone so there's no reason to throttle any connection. 
```java
public boolean tryAcquire(int numbers, long bytes) {
        return true;
}
```
- **RateLimiter** _permits_ and _acquiredPermits_ were changed to volatile.
```java
 private volatile long permits;
 private volatile long acquiredPermits;
```
in order to allow reading access from multiple threads at the same time.
also the removal of _synchronized_ keyword from _getAvailablePermits()_ function.
```java
public long getAvailablePermits() {
        return Math.max(0, this.permits - this.acquiredPermits);
}
```
**This is required, since a thread dead lock will happen if not.**

- Created ~a HashMap to manage the byte and message rate limiters, and~ a function _releaseThrottle()_ to handle the auto read enable.
If one of the rate limiters has no available permits we will not re-enable the auto read from the socket.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cherry-picked/branch-2.7 Archived: 2.7 is end of life cherry-picked/branch-2.8 Archived: 2.8 is end of life doc-not-needed Your PR changes do not impact docs release/2.7.4 release/2.8.1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants