Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disconnect() can hang waiting on c.workers.Wait() #554

Closed
CodeMonkeyLeet opened this issue Oct 26, 2021 · 4 comments
Closed

Disconnect() can hang waiting on c.workers.Wait() #554

CodeMonkeyLeet opened this issue Oct 26, 2021 · 4 comments

Comments

@CodeMonkeyLeet
Copy link

Related to #550, but the hang is not at the previously noted race on
<-c.commsStopped but prior to that on line 672 at c.workers.Wait():

paho.mqtt.golang/client.go

Lines 670 to 680 in 88d5334

go func() {
DEBUG.Println(CLI, "stopCommsWorkers waiting for workers")
c.workers.Wait()
// Stopping the workers will allow the comms routines to exit; we wait for these to complete
DEBUG.Println(CLI, "stopCommsWorkers waiting for comms")
<-c.commsStopped // wait for comms routine to stop
DEBUG.Println(CLI, "stopCommsWorkers done")
close(doneChan)
}()

The repro also does not involve attempting to reuse a client via Disconnect/Reconnect:

  • Create a new client default options and SetCleanSession(False) (not salient, hang reproes either way)
  • Connect the client and call Subscribe with a QoS level 1
  • Call Disconnect on the client as part of teardown

Is there an idiomatic way of releasing the client and any resources it holds other than calling Disconnect on it?

It looks like sometimes after c.stop is signaled as part of Disconnect, the goroutine responsible for flushing any acks doesn't end up getting to c.workers.Done():

paho.mqtt.golang/client.go

Lines 579 to 586 in 88d5334

case <-c.stop:
// Attempt to transmit any outstanding acknowledgements (this may well fail but should work if this is a clean disconnect)
if ackOut != nil {
for msg := range ackOut {
commsoboundP <- msg
}
c.workers.Done() // matchAndDispatch has completed
}

Relevant stacks I'm basing that hypothesis on highlighted below:

goroutine 99 [running]:
testing.(*M).startAlarm.func1()
        /usr/local/go/src/testing/testing.go:1788 +0xbb
created by time.goFunc
        /usr/local/go/src/time/sleep.go:180 +0x4a

goroutine 1 [chan receive, 2 minutes]:
testing.(*T).Run(0xc000501380, {0x3459033, 0x17}, 0x3531160)
        /usr/local/go/src/testing/testing.go:1307 +0x752
testing.runTests.func1(0x0)
        /usr/local/go/src/testing/testing.go:1598 +0x9a
testing.tRunner(0xc000501380, 0xc00065fbf8)
        /usr/local/go/src/testing/testing.go:1259 +0x230
testing.runTests(0xc000126800, {0x5654000, 0x11, 0x11}, {0x0, 0xc00032f340, 0x5781700})
        /usr/local/go/src/testing/testing.go:1596 +0x7cb
testing.(*M).Run(0xc000126800)
        /usr/local/go/src/testing/testing.go:1504 +0x9d2
main.main()
        _testmain.go:75 +0x22c

goroutine 34 [chan receive]:
k8s.io/klog/v2.(*loggingT).flushDaemon(0x0)
        /go/pkg/mod/k8s.io/klog/v2@v2.4.0/klog.go:1169 +0x8b
created by k8s.io/klog/v2.init.0
        /go/pkg/mod/k8s.io/klog/v2@v2.4.0/klog.go:417 +0x1c5

goroutine 52 [chan receive, 2 minutes]:
testing.(*T).Run(0xc000501520, {0xc000058d20, 0x13}, 0xc0001b9410)
        /usr/local/go/src/testing/testing.go:1307 +0x752
github.com/dapr/components-contrib/tests/conformance.(*TestConfiguration).Run(0xc000173fb0, 0xc000501520)
        /go/src/github.com/dapr/components-contrib/tests/conformance/common.go:266 +0x2b7
github.com/dapr/components-contrib/tests/conformance.TestBindingsConformance(0x0)
        /go/src/github.com/dapr/components-contrib/tests/conformance/bindings_test.go:20 +0x94
testing.tRunner(0xc000501520, 0x3531160)
        /usr/local/go/src/testing/testing.go:1259 +0x230
created by testing.(*T).Run
        /usr/local/go/src/testing/testing.go:1306 +0x727

goroutine 53 [chan receive]:
testing.(*T).Run(0xc000501a00, {0x343af9e, 0x5}, 0xc000706c00)
        /usr/local/go/src/testing/testing.go:1307 +0x752
github.com/dapr/components-contrib/tests/conformance/bindings.ConformanceTests(0xc0000522bc, 0xc000039080, {0x423a050, 0xc00066b980}, {0x4240c40, 0xc00066b9e0}, {{{0x343e22c, 0x8}, {0xc0000522bc, 0x4}, ...}, ...})
        /go/src/github.com/dapr/components-contrib/tests/conformance/bindings/bindings.go:260 +0xef2
github.com/dapr/components-contrib/tests/conformance.(*TestConfiguration).Run.func1(0xc000501a00)
        /go/src/github.com/dapr/components-contrib/tests/conformance/common.go:332 +0x10e8
testing.tRunner(0xc000501a00, 0xc0001b9410)
        /usr/local/go/src/testing/testing.go:1259 +0x230
created by testing.(*T).Run
        /usr/local/go/src/testing/testing.go:1306 +0x727

goroutine 97 [chan receive]:
github.com/eclipse/paho%2emqtt%2egolang.(*client).disconnect(0xc000032000)
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/client.go:481 +0x45
github.com/eclipse/paho%2emqtt%2egolang.(*client).Disconnect(0xc000032000, 0x1)
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/client.go:463 +0x792
github.com/dapr/components-contrib/bindings/mqtt.(*MQTT).Close(0xc00066b980)
        /go/src/github.com/dapr/components-contrib/bindings/mqtt/mqtt.go:328 +0x68
github.com/dapr/components-contrib/tests/conformance/bindings.ConformanceTests.func9(0x0)
        /go/src/github.com/dapr/components-contrib/tests/conformance/bindings/bindings.go:266 +0x207
testing.tRunner(0xc00070b380, 0xc000706c00)
        /usr/local/go/src/testing/testing.go:1259 +0x230
created by testing.(*T).Run
        /usr/local/go/src/testing/testing.go:1306 +0x727

goroutine 66 [syscall]:
os/signal.signal_recv()
        /usr/local/go/src/runtime/sigqueue.go:169 +0x98
os/signal.loop()
        /usr/local/go/src/os/signal/signal_unix.go:24 +0x25
created by os/signal.Notify.func1.1
        /usr/local/go/src/os/signal/signal.go:151 +0x53

goroutine 98 [semacquire]:
sync.runtime_Semacquire(0xc0000321f8)
        /usr/local/go/src/runtime/sema.go:56 +0x25
sync.(*WaitGroup).Wait(0xc0000321f0)
        /usr/local/go/src/sync/waitgroup.go:130 +0xea
github.com/eclipse/paho%2emqtt%2egolang.(*client).stopCommsWorkers.func1()
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/client.go:672 +0x108
created by github.com/eclipse/paho%2emqtt%2egolang.(*client).stopCommsWorkers
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/client.go:670 +0x265

goroutine 87 [chan send]:
github.com/dapr/components-contrib/tests/conformance/bindings.ConformanceTests.func3.1.1(0xc0001b8588)
        /go/src/github.com/dapr/components-contrib/tests/conformance/bindings/bindings.go:167 +0x86
github.com/dapr/components-contrib/bindings/mqtt.(*MQTT).Read.func1({0xc0007060f0, 0x0}, {0x4285e00, 0xc00012e320})
        /go/src/github.com/dapr/components-contrib/bindings/mqtt/mqtt.go:237 +0x254
github.com/eclipse/paho%2emqtt%2egolang.(*router).matchAndDispatch.func2()
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/router.go:210 +0xf03
created by github.com/eclipse/paho%2emqtt%2egolang.(*router).matchAndDispatch
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/router.go:169 +0x558

goroutine 88 [chan receive]:
github.com/eclipse/paho%2emqtt%2egolang.(*client).startCommsWorkers.func1()
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/client.go:582 +0x50d
created by github.com/eclipse/paho%2emqtt%2egolang.(*client).startCommsWorkers
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/client.go:564 +0xa14

goroutine 89 [chan send]:
github.com/eclipse/paho%2emqtt%2egolang.startIncoming.func1()
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:131 +0xdb
created by github.com/eclipse/paho%2emqtt%2egolang.startIncoming
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:117 +0x2b9

goroutine 90 [chan send]:
github.com/eclipse/paho%2emqtt%2egolang.startIncomingComms.func1()
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:219 +0xdd9
created by github.com/eclipse/paho%2emqtt%2egolang.startIncomingComms
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:159 +0x2fa

goroutine 91 [select]:
github.com/eclipse/paho%2emqtt%2egolang.startOutgoingComms.func1()
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:272 +0x326
created by github.com/eclipse/paho%2emqtt%2egolang.startOutgoingComms
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:259 +0x3da

goroutine 92 [chan send]:
github.com/eclipse/paho%2emqtt%2egolang.startComms.func1()
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:411 +0xe5
created by github.com/eclipse/paho%2emqtt%2egolang.startComms
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:400 +0x3c8

goroutine 93 [chan receive]:
github.com/eclipse/paho%2emqtt%2egolang.startComms.func2()
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:424 +0x7d
created by github.com/eclipse/paho%2emqtt%2egolang.startComms
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:423 +0x4d4

goroutine 94 [semacquire]:
sync.runtime_Semacquire(0xc0000c0554)
        /usr/local/go/src/runtime/sema.go:56 +0x25
sync.(*WaitGroup).Wait(0xc0000c0554)
        /usr/local/go/src/sync/waitgroup.go:130 +0xea
github.com/eclipse/paho%2emqtt%2egolang.startComms.func3()
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:432 +0x3e
created by github.com/eclipse/paho%2emqtt%2egolang.startComms
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:431 +0x585

goroutine 95 [select]:
github.com/eclipse/paho%2emqtt%2egolang.(*client).startCommsWorkers.func2()
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/client.go:613 +0x5a5
created by github.com/eclipse/paho%2emqtt%2egolang.(*client).startCommsWorkers
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/client.go:597 +0xcf4
@MattBrittan
Copy link
Contributor

MattBrittan commented Oct 26, 2021

It looks like your issue is here:

goroutine 87 [chan send]:
github.com/dapr/components-contrib/tests/conformance/bindings.ConformanceTests.func3.1.1(0xc0001b8588)
        /go/src/github.com/dapr/components-contrib/tests/conformance/bindings/bindings.go:167 +0x86
github.com/dapr/components-contrib/bindings/mqtt.(*MQTT).Read.func1({0xc0007060f0, 0x0}, {0x4285e00, 0xc00012e320})
        /go/src/github.com/dapr/components-contrib/bindings/mqtt/mqtt.go:237 +0x254
github.com/eclipse/paho%2emqtt%2egolang.(*router).matchAndDispatch.func2()
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/router.go:210 +0xf03
created by github.com/eclipse/paho%2emqtt%2egolang.(*router).matchAndDispatch
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/router.go:169 +0x558

router.go:210 is:

handler(client, m) // This calls your handler
m.Ack()

So one of your handlers is not returning. As per common problems in the readme:

  • Unless ordered delivery of messages is essential (and you have configured your broker to support this e.g. max_inflight_messages=1 in mosquitto) then set ClientOptions.SetOrderMatters(false). Doing so will avoid the below issue (deadlocks due to blocking message handlers).
  • A MessageHandler (called when a new message is received) must not block (unless ClientOptions.SetOrderMatters(false) set). If you wish to perform a long-running task, or publish a message, then please use a go routine (blocking in the handler is a common cause of unexpected pingresp not received, disconnecting errors).

I'm really not sure that the library can handle this any differently - Disconnect aims for a clean shutdown. We either block (like this) to expose issues with users handlers misbehaving or potentially leak resources by not waiting for them to complete.

@CodeMonkeyLeet
Copy link
Author

I see, thanks for the pointer! It turns out that the handler is signaling on a channel that is insufficiently deep and blocking as a result.

To elaborate further on the scenario, I have an adapter around MQTT that may be passed arbitrary handler functions and I would like to be able to recover the client by stopping and bringing up new instances of it. Let's say I wrap the handler to include a cancellation context controlled by the adapter. When I cancel the context and call Disconnect, what resources, if any, would be leaked in the mqtt lib by this approach?

On a related note, while the handler seen by the mqtt lib does complete in this case (cancellation forces progress) it looks like router.go:210 always calls Ack() on the message on handler completion. Since the handler() definition doesn't return an error, I was under the impression that the handler would be responsible for explicitly acking on success, but that doesn't seem to be the case. If a handler is cancelled, the message is lost presumably (mqtt delivers the outbound ack before disconnect and message is never redelivered.) Do you have any recommendations here?

@MattBrittan
Copy link
Contributor

When I cancel the context and call Disconnect, what resources, if any, would be leaked in the mqtt lib by this approach?

None if it shuts down properly (i.e. cancelling your context also terminates any callbacks). If the callback is blocked then go-routines are leaked (along with any resources they hold). Due to the way this library was written (over a long period by quite a few people) its not particularly easy to modify and keeping track of the various go-routines and what blocks what is difficult. This is one of the reasons that the mqtt v5 client is a total (incompatible) rewrite. I rewrote a large portion of this library a couple of years ago (simplifying, documenting, and removing a range of potential deadlocks) but was constrained by the need to avoid breaking existing clients.

I was under the impression that the handler would be responsible for explicitly acking on success, but that doesn't seem to be the case.

See issue #459 for a discussion on this point (there was also a discussion on the Paho mailing list). This package was originally designed on the basis that the ACK is a protocol level thing (so once the message has been received the ACK should be sent regardless of any issues the user may have processing the message). As per the comments on that issue I'm open to seeing this change implemented but it's not something I'm really interested in doing myself (have no use-case).

@CodeMonkeyLeet
Copy link
Author

Thanks @MattBrittan, this was incredibly helpful. I'll keep an eye on eclipse/paho.golang#53 and the v5 betas.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants