Skip to content

Commit

Permalink
Fix for #1600 (#1601)
Browse files Browse the repository at this point in the history
* Fix for #1600
wrapped txaio.resolve() calls in try/except to catch InvalidStateErrors

* Fix for #1600
Changed to stop the call from being "resolve"ed if the call has been cancelled, instead of catching the error

* Fix for #1600
- Added is_called check everywhere resolve is being called
- returning instead of passing as there is no need to carry on the method

Co-authored-by: David Ellis <david.ellis@atamate.com>
  • Loading branch information
Skully17 and David Ellis committed Dec 11, 2022
1 parent 8c57aff commit a99ff5d
Showing 1 changed file with 18 additions and 0 deletions.
18 changes: 18 additions & 0 deletions autobahn/wamp/protocol.py
Expand Up @@ -801,6 +801,9 @@ def _error(e):
# get and pop outstanding publish request
publish_request = self._publish_reqs.pop(msg.request)

if txaio.is_future(publish_request.on_reply) and txaio.is_called(publish_request.on_reply):
return

# create a new publication object
publication = Publication(msg.publication, was_encrypted=publish_request.was_encrypted)

Expand All @@ -816,6 +819,9 @@ def _error(e):
# get and pop outstanding subscribe request
request = self._subscribe_reqs.pop(msg.request)

if txaio.is_future(request.on_reply) and txaio.is_called(request.on_reply):
return

# create new handler subscription list for subscription ID if not yet tracked
if msg.subscription not in self._subscriptions:
self._subscriptions[msg.subscription] = []
Expand All @@ -837,6 +843,9 @@ def _error(e):
# get and pop outstanding subscribe request
request = self._unsubscribe_reqs.pop(msg.request)

if txaio.is_future(request.on_reply) and txaio.is_called(request.on_reply):
return

# if the subscription still exists, mark as inactive and remove ..
if request.subscription_id in self._subscriptions:
for subscription in self._subscriptions[request.subscription_id]:
Expand Down Expand Up @@ -924,6 +933,9 @@ def _error(fail):
# user callback that gets fired
on_reply = call_request.on_reply

if txaio.is_future(on_reply) and txaio.is_called(on_reply):
return

# above might already have rejected, so we guard ..
if enc_err:
txaio.reject(on_reply, enc_err)
Expand Down Expand Up @@ -1201,6 +1213,9 @@ def error(err):
# get and pop outstanding register request
request = self._register_reqs.pop(msg.request)

if txaio.is_future(request.on_reply) and txaio.is_called(request.on_reply):
return

# create new registration if not yet tracked
if msg.registration not in self._registrations:
registration = Registration(self, msg.registration, request.procedure, request.endpoint)
Expand Down Expand Up @@ -1235,6 +1250,9 @@ def error(err):
# get and pop outstanding subscribe request
request = self._unregister_reqs.pop(msg.request)

if txaio.is_future(request.on_reply) and txaio.is_called(request.on_reply):
return

# if the registration still exists, mark as inactive and remove ..
if request.registration_id in self._registrations:
self._registrations[request.registration_id].active = False
Expand Down

0 comments on commit a99ff5d

Please sign in to comment.