Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the topic in fenced state and can not recover. #11737

Merged
merged 3 commits into from Aug 25, 2021
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -179,7 +179,18 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx)
public void safeRun() {
// Remove this entry from the head of the pending queue
OpAddEntry firstInQueue = ml.pendingAddEntries.poll();
checkArgument(this == firstInQueue);
if (firstInQueue == null) {
// The pending op might been polled by others such as cleanup pending op when create Ledger failed.
ReferenceCountUtil.release(data);
this.recycle();
return;
}
if (this != firstInQueue) {
firstInQueue.failed(new ManagedLedgerException("Unexpected add entry op when complete the add entry op."));
ReferenceCountUtil.release(data);
this.recycle();
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one more question about this solution. Would it be necessary to call the callback for the OpAddEntry by using the "failed" method?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. @codelipenghui, can you explain why we call firstInQueue.failed and then release this.data and call this.recycle() without also calling this.failed? Note that the failed method has concurrency controls built in by using the callbackUpdater. Wouldn't we want to use those concurrency controls here as well?

    public void failed(ManagedLedgerException e) {
        AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
        if (cb != null) {
            ReferenceCountUtil.release(data);
            cb.addFailed(e, ctx);
            ml.mbean.recordAddEntryError();
        }
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari @michaeljmarshall As I mentioned in the PR description, there are 2 threads polling the add entry op from the queue, but here poll out the op from the queue but does not call the callback method of the op, this will lead to the pendingWriteOps of the topic can't change to 0 (before adding an entry, pendingWriteOps++. after entry add success for failed, pendingWriteOps--), so we should call the callback of the op here to make sure the pendingWriteOps can change to 0. The root cause of the issue is we missed some op callback calls here because the first op in the queue is not equaled to the current OpAddEntry(the previous behavior just throws an exception). So It's not about concurrency controls of the callbackUpdater.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your clarification. I think I understand why we're calling firstInQueue.failed. I don't understand why we're calling this.recycle without also calling addComplete or failed. If I understand correctly, by calling this.recycle, we're going to set callback = null; without calling its callback. If my understanding is incorrect, can you explain why we're recycling this without calling its callback? Thank you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are 2 threads polling the add entry op from the queue

@codelipenghui The 2 threads polling seem to be part of the problem. Is that so?

One goal of the Managed Ledger design seems to be that there's a single thread per managed ledger executing operations. PR #11387 "Pin executor and scheduled executor threads for ManagedLedgerImpl" fixes some issues where this pinned executor solution wasn't used.

Just curious, would #11387 prevent the 2 threads polling the add entry op from the queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I think I made a wrong change there, the OpAddEntry.failed() has released the data but looks like missed the object recycle, So we don't need to release it again.

So a thread takes the op from the queue, it should call the callback and release the data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari Yes, I think run the clearPendingAddEntries() in the pinned executor can also fix the problem

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I think I made a wrong change there, the OpAddEntry.failed() has released the data but looks like missed the object recycle, So we don't need to release it again.

So a thread takes the op from the queue, it should call the callback and release the data.

@codelipenghui Why isn't this.failed(...) called when this != firstInQueue ?

I would be expecting something like this:

        if (this != firstInQueue) {
            if (firstInQueue != null) {
                firstInQueue.failed(
                        new ManagedLedgerException("Unexpected add entry op when complete the add entry op."));
            }
            this.failed(new ManagedLedgerException("Unexpected add entry op when complete the add entry op."));
            return;
        }

Is there a reason to not call this.failed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui reading your previous answer, I would understand that clearPendingAddEntries is expected to call failed?

Since failed method has concurrency controls, I think it wouldn't cause any harm to call this.failed when this != firstInQueue. No?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, clearPendingAddEntries is expected to call failed.
And yes no harm to call this.failed, I think we only complete the op which comes from the queue is more clear here. All the add entry ops will add the queue, so someone poll the op from the queue, it should make sure the op.callback has been called so that we don't need to care about this.failed, we just need to complete all the ops in the queue.


ManagedLedgerImpl.NUMBER_OF_ENTRIES_UPDATER.incrementAndGet(ml);
ManagedLedgerImpl.TOTAL_SIZE_UPDATER.addAndGet(ml, dataLength);
Expand Down