Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close the old connection to make sure the broker drops the producer on its side #677

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,13 +858,14 @@ func (c *connection) Close() {
c.Lock()
cnx := c.cnx
c.Unlock()
c.changeState(connectionClosed)
c.changeState(connectionClosing)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does changing the state here to connectionClosing and then changing it to connectionClosed later fix the issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not to say that the problem is solved, but first change the state to connectionClosing , the logic is more rigorous, I modified the description of the PR, it does not mean that the issue is fixed, but it is related to investigating this issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no intermediate connectionClosing state, other threads may start to create new connections in GetConnection before the current cnx is really closed. This is a mistake in itself. State transitions should be stricter. You can see that the network connection state transition of the java client is very rigorous.

If we can ensure that the state transition is more rigorous, set the state to connectionClosing before cnx closed, which can prevent other threads to create a new connection. This may be able to solve the issue #676 that the old connection was not closed, and the new connection has initiated a request, even if this is not the root cause of the problem.


if cnx != nil {
_ = cnx.Close()
}

close(c.closeCh)
c.changeState(connectionClosed)

listeners := make(map[uint64]ConnectionListener)
c.listenersLock.Lock()
Expand Down