Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Async/Await Prototype #70

Open
wants to merge 22 commits into
base: master
Choose a base branch
from

Conversation

liamdiprose
Copy link

@liamdiprose liamdiprose commented Nov 16, 2019

Work in progress - Don't merge yet!

This PR presents and alternative API that makes full use of Python's async/await feature like other well-known python async libraries such as websockets and aiohttp. The client connection is handled by an async context manager, and event handling is achieved with the await keyword instead of callback functions:

async with gmqtt.connect('iot.eclipse.org') as client:
    await client.publish('test/message', 'hello world', qos=1)
    message = await client.subscribe('test/message')

Converting callbacks to await-ables

Callbacks can be converted to awaitable objects, which simplifies the user's code significantly:

Before
def on_message(client, topic, payload, qos, properties):
    print('RECV MSG:', payload)

client.on_message = on_message
client.subscribe('TEST/#', qos=0)
After
message = await client.subscribe('TEST/#', qos=0)
print('RECV MSG:', message.payload)

I also spotted a few areas in the codebase that makes heavy use callbacks that can be simplified using this method.

How

The various callback functions are made awaitable with a asyncio.Future object. This is how the on_connect callback can be adapted

async def connect(self, broker_host) -> MqttClientProtocol:
    future = self.loop.create_future()

    def _on_connect(client, flags, rc, properties):
        future.set_result(client)
    self.client.on_connect = _on_connect

    await self.client.connect(broker_host)
    return await future

Progress

  • multiple threading / possible concurrency errors (Future isn't threadsafe)
  • subscriptions are long living, but treated as returning a single message
  • Routing messages to subscriptions using their topic filter
  • any other functions in the API. Currently only have publish and subscribe
  • Errors and Exceptions
  • How the API relates to the callback API; Should this be a wrapper or a replacement? Wrapper
  • gmqtt.connect Arguments: Encryption, Authentication, Reconnection
  • Decide on one of the three flow-control options, or something else I haven't considered
  • Add unit tests that properly test the new API.
  • Docstrings for public classes and functions
  • Use API as the example in the Readme?

Copy link
Contributor

@Mixser Mixser left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multiple threading / possible concurrency errors (Future isn't threadsafe)

For which purpose do you want use threading ?

Yes you are right, that Future is not threadsafe, but in my opinion this is problem of customer not problem of library. Because customer can easy to shoot yourself in the foot by using one event loop in a few threads and forget to call run_coroutine_threadsafe.

gmqtt/aioclient.py Outdated Show resolved Hide resolved
gmqtt/aioclient.py Outdated Show resolved Hide resolved
self.loop = loop

self.cbclient = inner_client
self.message_queue = asyncio.Queue()
Copy link
Contributor

@Mixser Mixser Nov 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is dangerous, because if your "reading-code" will be blocked for long time - it will produce high memory usage and we will get memory overflow. So it will be nice to limit queue by option in __init__ method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I just looked at the flow control section of the MQTT spec. The broker won't send more messages than the client's send quota, initially set to receive_maximum. I imagine this is so the client's buffer doesn't overflow. The quota is incremented by a PUBACK, should we send this only once the message has been delivered to the user? (popped off the queue)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might already be something like this in the codebase. I'll have a look around.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, receive_maximum may be a good solution, but it's works only for qos1, qos2 not for qos0.

The quota is incremented by a PUBACK, should we send this only once the message has been delivered to the user?

In the current implementation you may choose when send PUBACK (by setting optimistic_acknowledgement to True or False) - after we received message (but before user process if by on_message callback) or after, it will be processed by on_message callback and will use result of it for PUBACK.

So in my opinion, it's good to save this behaviour and allow to user choose when PUBACK will be sent.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet, I chased down the optimistic_acknowledgement setting and found that what you described; the PUBACK is sent before or after the on_message callback has been run. I think I understand the usecase for optimistic acknowledgements now.

Without optimistic acknowledgments, sub1 will continue to receive messages but will never respond with a PUBACK:

async with gmqtt.connect(...) as client:
    sub1 = client.subscribe(...)
    async for message in client.subscribe(...):
        print(message)

I used to think the lack of PUBACK's would prompt a retransmission, but that doesn't seem to be the case. Section 4.4 of the spec says:

When a Client reconnects with Clean Start set to 0 and a session is present, both the Client and Server MUST resend any unacknowledged PUBLISH packets (where QoS > 0) and PUBREL packets using their original Packet Identifiers. This is the only circumstance where a Client or Server is REQUIRED to resend messages. Clients and Servers MUST NOT resend messages at any other time.

This means the broker won't retransmit packets like I thought. PUBACK is a notification of the client "taking ownership" of the message; the point where we are happy to let the broker forget the message. To me, that means once the message is delivered to the user. Optimistic acknowledgements just disable the flow control feature of MQTT (designed to keep our buffer from overflowing), and saves the broker from storing the messages while they sit in our buffer; An extremely light burden for most cases.

I'm happy to add the option for optimistic acknowledgments, it should just be passing the parameter through to the standard client. I don't think I'd recommend the feature to most people though. Perhaps you can convince me otherwise? 😛

Currently, the __handle_publish_callback is handled after on_message has returned:

gmqtt/gmqtt/mqtt/handler.py

Lines 329 to 330 in 4fb92a9

run_coroutine_or_function(self.on_message, self, print_topic, packet, 2, properties,
callback=partial(self.__handle_publish_callback, qos=2, mid=mid))

For non-optimistic acknowledgements, the library should send the PUBACK as soon as the message is delivered to the user.

My wrapper will need a special case where the __handle_publish_callback is passed to the on_message handler rather than run after it completes.

Subclassing Client is one option.

gmqtt/aioclient.py Outdated Show resolved Hide resolved
gmqtt/aioclient.py Show resolved Hide resolved
@liamdiprose
Copy link
Author

@Mixser Thanks for the review, I agree with all of your points, and corrected for most of them.

Threading

I was unsure if threading was going to be an issue or not (I have trust issues with callback functions). It sounds like threads aren't something we need to worry about, which is great.

Flow Control

You raised a good point with the infinitely-sized queue causing a possible memory overflow. MQTT has a flow-control method to prevent client buffer overflow, but depends on sending the PUBACK response once the message is removed from the queue and delivered to the user. Here's the solutions I can think of:

  1. Remove my queue and have the message buffering handled by the current client, which I assume handles flow-control.
  2. Modify the client to let the wrapper choose when the PUBACK is sent.
  3. Ignore MQTT's flow control spec, restrict the message queue size and error on overflow.

Option 1 sounds preferable as this is a wrapper class, but I'm unsure if it can be implemented with the callback API you have.

  • Is there any way to stop the on_message callback from firing so the internal queue starts filling up?

If not, I imagine it will be easier to implement Option 2. In my opinion, Option 3 should not be considered as it breaks the spec and could lead to qos>0 packets being lost 😬.

We will also have to throw out a qos=0 packet on an overfull queue, as MQTT's flow control doesn't account for them. This might be implemented elsewhere in this project.

Subscriptions

I rethought the subscription part of the API and implemented an object-oriented alternative that better reflects the long-lived nature of subscriptions, and removes the ambiguity you demonstrated. Here it's obvious what topic subscription message will come from:

async with gmqtt.connect('iot.eclipse.org') as client:
    sub = await client.subscribe('test1/#')
    sub2 = await client.subscribe('test2/#')
    message = await sub2.receive()
    await sub2.unsubscribe()  # optional

I also implemented the subscribe method as a context manager that unsubscribes on exit:

async with gmqtt.connect('iot.eclipse.org') as client:
    async with client.subscribe('test1/#') as sub:
        print(await sub.receive())

And made subscription a generator, as I imagined in my initial issue #69 🎉

import gmqtt.aioclient as gmqtt
async with gmqtt.connect('iot.eclipse.org') as client:
    async with client.subscribe('test1/#') as sub:
        async for message in sub:
            print(message.payload)

Next Steps

Additional functions

I'd like to wrap up everything connection related into the gmqtt.connect function, as it makes gmqtt hard to misuse. I'll probably add the following as arguments:

  • Encryption
  • Authentication
  • Reconnection

Is there any other functionality I'm missing out?

Flow control

  • Decide on one of the three options, or something else I haven't considered

Testing

I imagine I can mock the existing client and test that my wrapper calls the right functions.

  • Add unit tests that properly test the new API.
  • Doctests might be nice as this is user-facing code.

Documentation

  • Docstrings for public classes and functions
  • Use API as the example the Readme?

if "+" in level:
raise ValueError("Single-level wildcard (+) only allowed by itself")

def match(self, topic: str) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about using regexp ?
For example if a user passes a topic ("root/sub1/+/sub3") we can transform it in regexp ("root/sub1/([^/]+)/sub3") and code will be more clear.

Replacements: + -> ([^/]+), # ->(.+)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I am considering switching this to a regexp - I wonder how much faster it'll be.

I'm also considering using a tree structure to store the subscriptions, which I think will find all matching subscriptions with less work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tree structure will give us a speed boost if only you will have hundred of subscriptions. For a less then ten (in my opinion) it will be only overhead for iterating and managing this structure in memory.

But it's interesting and you may implement it and compare with for-loop and matching by regexp implementation.

# if over, attempt to drop qos=0 packet using `drop_policy`

# if under, add to appropiate queues:
for subscription_topic in self.subs:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's all good, but we can reduce count of indents and make the code more beautiful.

for topic in filter(lambda x: x.match(message.topic)):
    for sub in self.subs[topic]:
        subs.put_nowait(message)

or

for subscription_topic in self.subs:
    if not subscription_topic.match(message.topic):
        continue
    for subscription in self.subs[subscription_topic]:
        subscription.put_nowait(message)

if "+" in level:
raise ValueError("Single-level wildcard (+) only allowed by itself")

def match(self, topic: str) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tree structure will give us a speed boost if only you will have hundred of subscriptions. For a less then ten (in my opinion) it will be only overhead for iterating and managing this structure in memory.

But it's interesting and you may implement it and compare with for-loop and matching by regexp implementation.

receive_maximum = receive_maximum or 65665 # FIXME: Sane default?

self.subscription_manager = SubscriptionManager(receive_maximum)
# self.message_queue = asyncio.Queue(maxsize=receive_maximum or 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove all old comments

class TopicFilter:
def __init__(self, topic_filter: str):
self.levels = topic_filter.split("/")
TopicFilter.validate_topic(self.levels)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have some reason to call validate_topic by naming TopicFilter?

It will generate some unexpected behavior if you will inherit from it (if you will inherit and will want to overwrite validate_topic, also you will need to overwrite __init__ and call super and call validate_topic manually):

class B(TopicFilter):
    @staticmethod
    def validate_topic(*args):
        print("Never happens")
        return super().validate_topic(*args)

B("my-awesome-topic/bla")

@frederikaalund frederikaalund mentioned this pull request Apr 6, 2020
@okaestne
Copy link

okaestne commented Nov 7, 2023

Hi! Is there still any interest in getting this PR ready to be merged? I'm currently implementing my own asyncio wrapper around this library. My main motivations are to be able to:

  1. await the result of (un)subscriptions
  2. call specialized callbacks for each subscription

I use this wrapper to implement the command-response pattern using the MQTT5 response topic/correlation data headers.

For 1) I'm simply tracking the mid and resolving an asyncio.Future on a matching (Un)SubAck.
For 2) I'm assigning a (random) subscription identifier (implies MQTT5), maintaining a sub_id -> callback mapping and filtering incoming messages to call the callbacks or fallback to the default on_message callback.

I think the two goals are common enough that this library could support its users by providing an appropriate (async) interface. I may be able to contribute some of the necessary changes either to this PR or separately if you are open to alternative approaches. I'd just like to ask you about your general opinion on the overall style (i.e. full asyncio wrapper as in this PR or additional _async methods?) and requirements regarding MQTT5-only features, e.g. specialized per-subscription callbacks only available when using MQTT5.

@Mixser
Copy link
Contributor

Mixser commented Nov 14, 2023

Hi, thanks for interest in GMQTT 🎉

I'm simply tracking the mid and resolving an asyncio.Future on a matching (Un)SubAck.

Yes, this is the right way how it could be implemented, but you need to be careful how to manage reconnection and other staff because there is the possibility of leaving leftovers asyncio.Feature that will never be completed (eg. due to network reconnections)

For 2) I'm assigning a (random) subscription identifier (implies MQTT5), maintaining a sub_id -> callback mapping and filtering incoming messages to call the callbacks or fallback to the default on_message callback.

It's quite tricky because even if the server implements v5, it might not support subscription identifiers at all (see CONNACK Properties -> 3.2.2.3.12 Subscription Identifiers Available in spec )

About style, @Lenka42 could you support 😉

@okaestne
Copy link

Yes, this is the right way how it could be implemented, but you need to be careful how to manage reconnection and other staff because there is the possibility of leaving leftovers asyncio.Feature that will never be completed (eg. due to network reconnections)

Yeah that's definitely something to consider. I'm using asyncio.wait_for() to set a timeout and asyncio.Future.add_done_callback() to remove the Future from the tracking list. I didn't test how reconnects are handled yet.

It's quite tricky because even if the server implements v5, it might not support subscription identifiers at all (see CONNACK Properties -> 3.2.2.3.12 Subscription Identifiers Available in spec )

About style, @Lenka42 could you support 😉

Yes, that's another thing that could be tested when connecting. I understand that there is a potential conflict between flexibility and ease-of-use. Users should be still able to decide, which headers/properties are sent to the broker. I also can't tell how many v5 capable brokers do or do not support subscription IDs, at least mosquitto does 😁

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants