Skip to content

Commit

Permalink
Close the old connection to make sure the broker drops the producer o…
Browse files Browse the repository at this point in the history
…n its side
  • Loading branch information
wenbingshen committed Dec 3, 2021
1 parent 742f1b1 commit bda4d40
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ func (c *connection) Close() {
c.Lock()
cnx := c.cnx
// do not use changeState() since they share the same lock
c.setState(connectionClosed)
c.setState(connectionClosing)
c.cond.Broadcast()
c.Unlock()

Expand All @@ -853,6 +853,13 @@ func (c *connection) Close() {

close(c.closeCh)

c.Lock()
cnx := c.cnx
// do not use changeState() since they share the same lock
c.setState(connectionClosed)
c.cond.Broadcast()
c.Unlock()

listeners := make(map[uint64]ConnectionListener)
c.listenersLock.Lock()
for id, listener := range c.listeners {
Expand Down

0 comments on commit bda4d40

Please sign in to comment.