Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState #22572

Merged
merged 3 commits into from Apr 28, 2024

Conversation

shibd
Copy link
Member

@shibd shibd commented Apr 24, 2024

Motivation

#22571

Analysis

When enabling replicateSubscriptionState will use topic to sync subscription state, and make these message metadata as Marker

These marker messages will not be sent to the consumer by the topic, and will automatically ack them.

if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
final int readerIndex = metadataAndPayload.readerIndex();
processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
metadataAndPayload.readerIndex(readerIndex);
}
// Deliver marker to __compaction cursor to avoid compaction task stuck,
// and filter out them when doing topic compaction.
if (msgMetadata == null || cursor == null
|| !cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)) {
entries.set(i, null);
entry.release();
individualAcknowledgeMessageIfNeeded(Collections.singletonList(pos),
Collections.emptyMap());
continue;
}

But getLastMessageId will always return the last message position, regardless of whether the last message is marked or not. This will cause the reader stuck.

        while (reader.hasMessageAvailable()) {  // get true
              Message message reader.readNext();  // never can't receive msg.
        }

You can refer to this diagram to help understand this bug:

image

Modifications

  • Add asyncReverseFindPositionOneByOne method on ManagedLedger .
  • Add getLastCanDispatchPosition method on Topic, it will call asyncReverseFindPositionOneByOne to find the last position of entry that not is replistateSubscriptionState
  • Change the getLastMessageId implement of ServerCnx to use getLastCanDispatchPosition instead of getMaxReadPosition.

Verifying this change

  • Add ManagedLedgerTest.testReverseFindPositionOneByOne to cover ReverseFindPositionOneByOne method.
  • Add testReplicatedSubscriptionAcrossTwoRegionsGetLastMessage to cover this bug.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

@shibd shibd self-assigned this Apr 24, 2024
@shibd shibd added type/bug The PR fixed a bug or issue reported a bug ready-to-test labels Apr 24, 2024
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Apr 24, 2024
Copy link
Member

@dao-jun dao-jun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me, just left some minor comments about the code style

@shibd shibd requested a review from coderzc April 28, 2024 07:42
Copy link
Member

@coderzc coderzc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@codecov-commenter
Copy link

Codecov Report

Attention: Patch coverage is 80.00000% with 9 lines in your changes are missing coverage. Please review.

Project coverage is 74.13%. Comparing base (bbc6224) to head (f094e04).
Report is 203 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #22572      +/-   ##
============================================
+ Coverage     73.57%   74.13%   +0.56%     
+ Complexity    32624     2747   -29877     
============================================
  Files          1877     1886       +9     
  Lines        139502   140653    +1151     
  Branches      15299    15462     +163     
============================================
+ Hits         102638   104278    +1640     
+ Misses        28908    28331     -577     
- Partials       7956     8044      +88     
Flag Coverage Δ
inttests 27.23% <57.77%> (+2.64%) ⬆️
systests 24.48% <57.77%> (+0.15%) ⬆️
unittests 73.44% <80.00%> (+0.59%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
...sar/broker/service/persistent/PersistentTopic.java 78.66% <100.00%> (+0.20%) ⬆️
...va/org/apache/pulsar/broker/service/ServerCnx.java 72.47% <93.33%> (+0.33%) ⬆️
...n/java/org/apache/pulsar/broker/service/Topic.java 34.78% <0.00%> (-1.59%) ⬇️
...ookkeeper/mledger/util/ManagedLedgerImplUtils.java 69.56% <69.56%> (ø)

... and 278 files with indirect coverage changes

@codelipenghui codelipenghui merged commit a761b97 into apache:master Apr 28, 2024
50 checks passed
codelipenghui pushed a commit to codelipenghui/incubator-pulsar that referenced this pull request Apr 28, 2024
… replicateSubscriptionState (apache#22572)

(cherry picked from commit a761b97)
codelipenghui pushed a commit to codelipenghui/incubator-pulsar that referenced this pull request Apr 28, 2024
… replicateSubscriptionState (apache#22572)

(cherry picked from commit a761b97)
codelipenghui pushed a commit that referenced this pull request Apr 28, 2024
… replicateSubscriptionState (#22572)

(cherry picked from commit a761b97)
@dao-jun
Copy link
Member

dao-jun commented Apr 28, 2024

The PR handled the case of ServerOnlyMarker, but it looks we also need to handle txn aborted messages.
I created a PR for improvement, PTAL
#22610

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

9 participants