Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

token.Wait() does not wait for completion #563

Open
ecksun opened this issue Dec 3, 2021 · 6 comments
Open

token.Wait() does not wait for completion #563

ecksun opened this issue Dec 3, 2021 · 6 comments

Comments

@ecksun
Copy link
Contributor

ecksun commented Dec 3, 2021

I'm trying to use this library in a test. What I want to do is send some messages and then exit. My broker will disconnect the mqtt connection if I try to send messages too early so I want to retry until I know the message is delivered so I can exit when that is done.

The issue is that token.Wait() doesn't always wait until the message is actually delivered. Reading through issues, especially this comment from @MattBrittan it seems like this is "intended" behavior.

I'm using paho.mqtt.golang v1.3.5

Here is some code to reproduce the issue:

package main

import (
	"fmt"
	"log"
	"os"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

func main() {
	mqtt.ERROR = log.New(os.Stdout, "[ERROR] ", 0)
	mqtt.CRITICAL = log.New(os.Stdout, "[CRIT] ", 0)
	mqtt.WARN = log.New(os.Stdout, "[WARN]  ", 0)
	mqtt.DEBUG = log.New(os.Stdout, "[DEBUG] ", 0)
	opts := mqtt.NewClientOptions().
		AddBroker("tcp://localhost:1883").
		SetClientID("tester").
		SetUsername("hunter")

	c := mqtt.NewClient(opts)
	if token := c.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}

	token := c.Publish("test", 1, false, []byte{})
	token.Wait()
	if token.Error() != nil {
		fmt.Printf("token.Error() = %+v\n", token.Error())
	}

	c.Disconnect(250)
}

and the logs:

[DEBUG] [client]   Connect()
[DEBUG] [store]    memorystore initialized
[DEBUG] [client]   about to write new connect msg
[DEBUG] [client]   socket connected to broker
[DEBUG] [client]   Using MQTT 3.1.1 protocol
[DEBUG] [net]      connect started
[DEBUG] [net]      received connack
[DEBUG] [client]   startCommsWorkers called
[DEBUG] [client]   client is connected/reconnected
[DEBUG] [net]      incoming started
[DEBUG] [net]      startIncomingComms started
[DEBUG] [net]      outgoing started
[DEBUG] [net]      startComms started
[DEBUG] [pinger]   keepalive starting
[DEBUG] [client]   startCommsWorkers done
[DEBUG] [net]      outgoing waiting for an outbound message
[WARN]  [store]    memorystore wiped
[DEBUG] [client]   exit startClient
[DEBUG] [client]   enter Publish
[DEBUG] [client]   sending publish message, topic: test
[DEBUG] [net]      obound msg to write 1
[DEBUG] [net]      obound wrote msg, id: 1
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      startIncomingComms: inboundFromStore complete
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      incoming complete
[DEBUG] [net]      startIncomingComms: got msg on ibound
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      startIncomingComms: ibound complete
[DEBUG] [net]      startIncomingComms goroutine complete
[DEBUG] [net]      outgoing waiting for an outbound message
[ERROR] [client]   Connect comms goroutine - error triggered EOF
[DEBUG] [client]   internalConnLost called
[DEBUG] [client]   stopCommsWorkers called
[DEBUG] [pinger]   keepalive stopped
[DEBUG] [router]   matchAndDispatch exiting
[DEBUG] [client]   stopCommsWorkers waiting for workers
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [client]   internalConnLost waiting on workers
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      outgoing comms stopping
[DEBUG] [net]      startComms closing outError
[DEBUG] [client]   startCommsWorkers output redirector finished
[DEBUG] [client]   incoming comms goroutine done
[DEBUG] [client]   stopCommsWorkers waiting for comms
[DEBUG] [client]   stopCommsWorkers done
[DEBUG] [client]   internalConnLost workers stopped
[DEBUG] [client]   internalConnLost complete
[DEBUG] Connection lost: EOF
[DEBUG] [client]   enter reconnect
[DEBUG] [client]   about to write new connect msg
[DEBUG] [client]   socket connected to broker
[DEBUG] [client]   Using MQTT 3.1.1 protocol
[DEBUG] [net]      connect started
[DEBUG] [net]      received connack
[DEBUG] [client]   startCommsWorkers called
[DEBUG] [client]   client is connected/reconnected
[DEBUG] [net]      incoming started
[DEBUG] [net]      startIncomingComms started
[DEBUG] [net]      outgoing started
[DEBUG] [net]      startComms started
[DEBUG] [pinger]   keepalive starting
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [client]   startCommsWorkers done
[DEBUG] [store]    enter Resume
[DEBUG] [store]    memorystore get: message 1 found
[DEBUG] [store]    loaded pending publish (1)
[DEBUG] [client]   disconnecting
[DEBUG] [store]    {1 1}
[DEBUG] [store]    exit resume
[DEBUG] [client]   calling WaitTimeout
[DEBUG] [net]      obound priority msg to write, type *packets.DisconnectPacket
[DEBUG] [net]      startIncomingComms: inboundFromStore complete
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      outbound wrote disconnect, closing connection
[DEBUG] [client]   WaitTimeout done
[DEBUG] [client]   stopCommsWorkers called
[DEBUG] [client]   stopCommsWorkers waiting for workers
[DEBUG] [pinger]   keepalive stopped
[DEBUG] [net]      incoming complete
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      startIncomingComms: ibound complete
[DEBUG] [net]      startIncomingComms goroutine complete
[DEBUG] [net]      obound msg to write 1
[DEBUG] [router]   matchAndDispatch exiting
[ERROR] [net]      outgoing obound reporting error  write tcp 127.0.0.1:35620->127.0.0.1:1883: use of closed network connection
[DEBUG] [client]   startCommsWorkers output redirector finished
[DEBUG] [client]   stopCommsWorkers waiting for comms
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      outgoing comms stopping
[DEBUG] [net]      startComms closing outError
[DEBUG] [client]   incoming comms goroutine done
[DEBUG] [client]   stopCommsWorkers done
[DEBUG] [client]   forcefully disconnecting
[DEBUG] [msgids]   cleaned up
[DEBUG] [client]   disconnected
[DEBUG] [store]    memorystore closed

My understanding of what is happening is this:

  1. Try to publish message to broker
  2. Broker sends EOF
  3. Client reconnects
  4. c.claimID(token, details.messageId) is called here, resulting in the original token be considered complete
  5. c.Disconnect(250) is called as token.Wait() completed successfully

To solve my immedaite issue I will try to disable auto-reconnect and handle reconnects myself.

Would it not make sense to have token.Wait() wait until the message is actually delivered, regardless of reconnects?

At least documenting the caveat would be nice

@MattBrittan
Copy link
Contributor

MattBrittan commented Dec 4, 2021

comment from @MattBrittan it seems like this is "intended" behavior.

I'm not sure I'd go quite that far - I'm guessing that it was intended at some point in the past (possibly when persistence was initially introduced) and changing it now is likely to break existing usages. When this library was originally created tokens were seen as a good idea but that is no longer the case (which is why the v5 client is a complete rewrite).

This behaviour does make some sense because what should matter to the client is that messages will be delivered. Whether this means that they have been transmitted or stored on the client itself for transmission in the future should not really be the concern of the user (because the library automates this). I accept that this can be confusing but returning an error would also be confusing (because that could be taken to indicate that the message will not be sent when it will be resent when the client reconnects).

In your case you are attempting to work around a buggy broker and a slightly unusual requirement (connecting to deliver one message). There are a few options:

  • options.SetCleanSession(true) and options.SetAutoReconnect(false)- if Clean Session is true then you will get an error when the connection is lost (makes sense because no attempt will be made to transmit the message if the connection is re-established). You would need to handle reconnection in this case.
  • Use a custom store than enables you to see what is queued. You could take the existing memory store (the default store) and add a callback when the store becomes empty. You could then use that to trigger the call to Disconnect (and use options. SetAutoReconnect(true) and options.SetAutoReconnect(true) so the client will just keep trying). As the message will only be removed from the store when the ACK is retrieved this should accomplish your goal.

@ecksun
Copy link
Contributor Author

ecksun commented Dec 6, 2021

This behaviour does make some sense because what should matter to the client is that messages will be delivered

Yes, but that is not happening at the moment. The library is saying it has been delivered yet no message has been sent.

I accept that this can be confusing but returning an error would also be confusing (because that could be taken to indicate that the message will not be sent when it will be resent when the client reconnects).

Well the error should not be returned when the client reconnects but rather when the message fails to be delivered (or .Wait should hang forever if it can never be delivered).

...and changing it now is likely to break existing usages

yeah, and if that is the case then we could document the behavior instead, as it was quite difficult to understand, at least for me.

In your case you are attempting to work around a buggy broker

Perhaps, but the broker is working as intended (as I intended it :D)

I managed to make a working solution with options.SetAutoReconnect(false) as that made the token return an error and allowed me retry myself.

@MattBrittan
Copy link
Contributor

MattBrittan commented Dec 6, 2021

yeah, and if that is the case then we could document the behavior instead, as it was quite difficult to understand, at least for me.

Thanks for the pull request - I'll review shortly.

Yes, but that is not happening at the moment. The library is saying it has been delivered yet no message has been sent.

The challenge here is that the library has heaps of options which makes documenting this kind of thing fairly complicated.

When using a filestore (recommended if you want guaranteed delivery) QOS1+ messages will be persisted to disk as soon as Publish is called. Such messages will then be delivered at some point in the future (assuming options.SetCleanSession(false)); this is the case even if the application is shutdown and restarted (because the message will be read from disk). In this case returning an error when disconnecting is not appropriate.

In your situation you are using the memstore; this is only active as long as the application is running. The main library appears to have been written with the assumption that the store is persistent which is why no error is returned when the connection is lost.

Well the error should not be returned when the client reconnects but rather when the message fails to be delivered (or .Wait should hang forever if it can never be delivered).

Unfortunately this exposes one of the issues with the whole token method. A frequent usage pattern is to kick off a go routine to monitor the token. This means that leaving the token hanging is likely to result in goroutine leaks which we really want to avoid!. This means that, as written, the token needs to complete (either with an error or without) and neither option is great in this situation. I would add that the token does not need to complete when the client will initiate a reconnect (and it would be nice if this only happened when no reconnect would be attempted).


Looking at the code further the situation is more complex than I had thought. In many situations (where auto reconnection is not used) on loss of connection the tokens will just hang (because c.messageIds.cleanUp() is not called unless c.options.CleanSession && !reconnect) and the store is not closed (makes sense to a degree but it should then be closed if the user calls Disconnect but due to the way this is written even a call to disconnect will not close the store because the CommsWorkers have already been stopped). This whole area could do with some work but the first step would be to determine what should happen here (as mentioned above raising an error is not necessarily the appropriate option).

I'm also not sure how much work to put into this; I'd rather put available time (not much at the moment) into adding support for persistence into the v5 client (which does not use tokens and is a lot simpler to work with because of this!).

@ecksun
Copy link
Contributor Author

ecksun commented Dec 8, 2021

alright, I see. If I understand you correctly it seems like the design of the library (with persistence) makes it very tricky to know if messages has been delivered or not. Based on my rather limited understanding on the library perhaps this could be outsourced to the library-user by notifying the user of all messageIds that have been delivered?

Looking at the code further the situation is more complex than I had thought

Yeah, for sure. I was almost about to suggest we add another option that disables the filestore :)

I'm also not sure how much work to put into this

Not much I'd say. It is clear this is a very tricky situation and it is manageable by simply handling reconnects manually. I think this issue and the added caveat in the README is more than enough.

persistence into the v5 client

Personally I haven't spent time figuring out if migrating to mqtt v5 is always worthwhile, I guess I should look into that.

That said, given what you just said it sounds like persistence is very tricky, is it worth adding that complexity to the v5 client also?

@MattBrittan
Copy link
Contributor

If I understand you correctly it seems like the design of the library (with persistence) makes it very tricky to know if messages has been delivered or not

Kind of. The issue is that if persistence (using a persistent store - not memstore) is enabled then the message should be delivered eventually (even after a restart). If an error is returned then some users may assume this means they need to resend their message which would lead to double ups.

Yeah, for sure. I was almost about to suggest we add another option that disables the filestore :)
is it worth adding that complexity to the v5 client also?

Persistence is part of the MQTT spec so it's something that a full client implementation needs - without it you cannot trust the QOS 1+. In many use-cases it really simplifies things. For example I generally use MQTT as a transport for messages from a variety of monitoring systems; I want to be able to call Publish and know that the message will get through eventually (as a user of the library I don't want to have to do anything further to track delivery). As such I will not be migrating to the V5 client until persistence is implemented.

If the library does not support this then users need to implement it themselves and, given the complexity, there are likely to be bugs in their implementations. It seems better to have an implementation within the library so that we all benefit from each others efforts to improve it (I've put a lot of work into persistence on the V3 client because it was something I needed).

The plan with the v5 client is to implement persistence as an add-on so that the complexity is kept out of the main library. Unfortunately coming up with a design that achieves this is taking a lot longer than I hoped!

@MattBrittan
Copy link
Contributor

The V5 client now offers session persistence (when you get the master branch; we hope to make a release this month).

I'm going to leave this issue open; if anyone wants to work on a solution to the issues mentioned then they are welcome to have a go (but probably best to discuss the proposed solutions first).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants