Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix for stack growth on reconnect #854

Merged
merged 51 commits into from
Sep 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
71a6b66
rel example
bubbleboy14 Dec 7, 2021
0a090e6
tweaked rel example for linter
bubbleboy14 Dec 7, 2021
934f665
added rel note to examples.rst
bubbleboy14 Dec 8, 2021
0b14e89
slightly more compact example
bubbleboy14 Dec 8, 2021
255dcae
added example header
bubbleboy14 Dec 8, 2021
93fa6a8
matched wording
bubbleboy14 Dec 8, 2021
cb73493
Merge remote-tracking branch 'upstream/master'
bubbleboy14 Feb 13, 2022
3ccecbd
_socket.recv(): _recv() except socket.error - changed or to and; adde…
bubbleboy14 Feb 13, 2022
58d4b5f
_app - custom dispatcher check_callback() integration (and fixed pyev…
bubbleboy14 Feb 13, 2022
7514d17
Add clarifying comment, rerun CI tests
engn33r Feb 25, 2022
85342a8
Add space to make linter happy
engn33r Feb 25, 2022
218e33c
merged upstream
bubbleboy14 Jul 11, 2022
a90f540
Merge branch 'master' of github.com:bubbleboy14/websocket-client
bubbleboy14 Jul 11, 2022
754cf66
working reconnect
bubbleboy14 Jul 11, 2022
56f5f12
rmed logs
bubbleboy14 Jul 11, 2022
8b6ae4b
added _logging.warning() disconnected/reconnected notifications to ha…
bubbleboy14 Jul 12, 2022
11b2f69
moved connect notification and dispatcher.read() (if doread kwarg [de…
bubbleboy14 Jul 12, 2022
2d9f27b
run_forever(): reconnect kwarg now specifies sleep() time (defualt 5)
bubbleboy14 Jul 15, 2022
8916d4f
handleDisconnect(): fixed log msg
bubbleboy14 Jul 17, 2022
6572125
run_forever() refactor: stabilized stack frame count (at least in rel…
bubbleboy14 Jul 17, 2022
23f3287
dispatcher simplification via DispatcherBase and DispatcherBase/Wrapp…
bubbleboy14 Jul 17, 2022
d905d23
_logging: info(); enableTrace() supports level kwarg (default "DEBUG")
bubbleboy14 Jul 17, 2022
f6088c9
handleDisconnect() uses info() log
bubbleboy14 Jul 17, 2022
07cfd26
Fix linting errors
engn33r Aug 10, 2022
c588d48
merged in upstream
bubbleboy14 Aug 26, 2022
2b10daf
moved timeout() from Dispatcher to DispatcherBase (thus also applying…
bubbleboy14 Aug 26, 2022
2840403
reconnect()s for DispatcherBase (uses while loop) and WrappedDispatch…
bubbleboy14 Aug 26, 2022
64d48c5
custom_dispatcher switch in handleDisconnect()
bubbleboy14 Aug 26, 2022
592ab59
merged
bubbleboy14 Aug 26, 2022
f5b1fc7
WrappedDispatcher constructor registers keyboard interrupt signal
bubbleboy14 Aug 26, 2022
baa3365
DispatcherBase.reconnect(): wrapped while loop in KeyboardInterrupt t…
bubbleboy14 Aug 26, 2022
8537e50
fixed lint errors
bubbleboy14 Aug 30, 2022
99ff287
_app: RECONNECT (default 5) and setReconnect() setter; WebSocketApp.r…
bubbleboy14 Aug 30, 2022
d1a843d
tests.test_app: ws.setReconnect(0) (may fix test stall issue)
bubbleboy14 Aug 30, 2022
d9dc788
oops, added setReconnect import to websocket __init__
bubbleboy14 Aug 30, 2022
fbcc7e9
blank line for linter
bubbleboy14 Aug 30, 2022
8d9a08a
linter line
bubbleboy14 Aug 30, 2022
adca14d
added rel to setup extras_require{test}[]
bubbleboy14 Aug 30, 2022
2f438fc
adjusted testRunForeverDispatcher() to use rel (including dispatch())
bubbleboy14 Aug 30, 2022
6077fe5
setup: moved rel dep from extras_require{test}[] to tests_require[]
bubbleboy14 Aug 30, 2022
f1415ba
meh trying install_requires[] (tests_require[] depped??)
bubbleboy14 Aug 30, 2022
e716454
set RECONNECT (run_forever() reconnect kwarg default) to 0 (can be al…
bubbleboy14 Sep 3, 2022
5d318ff
rmed rel from install_requires[] (only added for tests, and was not w…
bubbleboy14 Sep 3, 2022
c373193
test_app: rmed ws.setReconnect(0) (0 is new default)
bubbleboy14 Sep 3, 2022
f8596f2
run_forever() reconnect->RECONNECT fallback in func instead of kwarg …
bubbleboy14 Sep 3, 2022
a304b7a
test_app: disabled rel import (unsure how to set up test dependency) …
bubbleboy14 Sep 3, 2022
1f0bcab
linter fixes
bubbleboy14 Sep 3, 2022
c35700a
linter comment spaces
bubbleboy14 Sep 3, 2022
521f793
run_forever() returns False to pass testRunForeverTeardownCleanExit test
bubbleboy14 Sep 3, 2022
82f6d97
run_forever() returns False unless error (handleDisconnect() changes …
bubbleboy14 Sep 3, 2022
b0817c0
rval->self.has_errored
bubbleboy14 Sep 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion websocket/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
limitations under the License.
"""
from ._abnf import *
from ._app import WebSocketApp
from ._app import WebSocketApp, setReconnect
from ._core import *
from ._exceptions import *
from ._logging import *
Expand Down
56 changes: 43 additions & 13 deletions websocket/_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@

__all__ = ["WebSocketApp"]

RECONNECT = 0


def setReconnect(reconnectInterval):
global RECONNECT
RECONNECT = reconnectInterval


class DispatcherBase:
"""
Expand All @@ -39,6 +46,19 @@ def __init__(self, app, ping_timeout):
self.app = app
self.ping_timeout = ping_timeout

def timeout(self, seconds, callback):
time.sleep(seconds)
callback()

def reconnect(self, seconds, reconnector):
try:
while True:
_logging.info("reconnect() - retrying in %s seconds [%s frames in stack]" % (seconds, len(inspect.stack())))
time.sleep(seconds)
reconnector(reconnecting=True)
except KeyboardInterrupt as e:
_logging.info("User exited %s" % (e,))


class Dispatcher(DispatcherBase):
"""
Expand All @@ -56,10 +76,6 @@ def read(self, sock, read_callback, check_callback):
check_callback()
sel.close()

def timeout(self, seconds, callback):
time.sleep(seconds)
callback()


class SSLDispatcher(DispatcherBase):
"""
Expand Down Expand Up @@ -96,14 +112,18 @@ def __init__(self, app, ping_timeout, dispatcher):
self.app = app
self.ping_timeout = ping_timeout
self.dispatcher = dispatcher
dispatcher.signal(2, dispatcher.abort) # keyboard interrupt

def read(self, sock, read_callback, check_callback):
self.dispatcher.read(sock, read_callback)
self.ping_timeout and self.dispatcher.timeout(self.ping_timeout, check_callback)
self.ping_timeout and self.timeout(self.ping_timeout, check_callback)

def timeout(self, seconds, callback):
self.dispatcher.timeout(seconds, callback)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be self.timeout(seconds, callback) instead of self.dispatcher.timeout(seconds, callback)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - it's defined on line 114


def reconnect(self, seconds, reconnector):
self.timeout(seconds, reconnector)


class WebSocketApp:
"""
Expand Down Expand Up @@ -195,6 +215,7 @@ def __init__(self, url, header=None,
self.last_pong_tm = 0
self.subprotocols = subprotocols
self.prepared_socket = socket
self.has_errored = False

def send(self, data, opcode=ABNF.OPCODE_TEXT):
"""
Expand Down Expand Up @@ -240,7 +261,7 @@ def run_forever(self, sockopt=None, sslopt=None,
http_proxy_timeout=None,
skip_utf8_validation=False,
host=None, origin=None, dispatcher=None,
suppress_origin=False, proxy_type=None, reconnect=5):
suppress_origin=False, proxy_type=None, reconnect=None):
"""
Run event loop for WebSocket framework.

Expand Down Expand Up @@ -290,6 +311,9 @@ def run_forever(self, sockopt=None, sslopt=None,
True if any other exception was raised during a loop.
"""

if reconnect is None:
reconnect = RECONNECT

if ping_timeout is not None and ping_timeout <= 0:
raise WebSocketException("Ensure ping_timeout > 0")
if ping_interval is not None and ping_interval < 0:
Expand Down Expand Up @@ -331,7 +355,7 @@ def teardown(close_frame=None):
# Finally call the callback AFTER all teardown is complete
self._callback(self.on_close, close_status_code, close_reason)

def setSock():
def setSock(reconnecting=False):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this reconnecting variable useful for? I see it in line 370 but I don't think I'm clear on how its use on that line impacts the reconnect process. This variable name is very close to reconnect, so a different name that better differentiates would be better.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's so that on line 370 it doesn't call handleDisconnect() again if it's already "reconnecting". That's actually the heart of this pull request - in synchronous (default) mode, the stack grows if handleDisconnect() is called (by way of a couple intermediate functions) recursively. Thus, when DispatcherBase.reconnect() (line 51) calls reconnector() (which is setSock()), it passes reconnecting=True, so that if the connection fails it exits setSock(), returns to DispatcherBase.reconnect() (popping the setSock() frame off the stack), and tries again after the wait interval.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And of course I'm open to changing "reconnecting" if there's a better name for that kwarg :)

self.sock = WebSocket(
self.get_mask_key, sockopt=sockopt, sslopt=sslopt,
fire_cont_frame=self.on_cont_message is not None,
Expand All @@ -352,18 +376,21 @@ def setSock():

_logging.warning("websocket connected")
dispatcher.read(self.sock.sock, read, check)
except (Exception, ConnectionRefusedError, KeyboardInterrupt, SystemExit) as e:
handleDisconnect(e)
except (WebSocketConnectionClosedException, ConnectionRefusedError, KeyboardInterrupt, SystemExit, Exception) as e:
_logging.error("%s - %s" % (e, reconnect and "reconnecting" or "goodbye"))
reconnecting or handleDisconnect(e)

def read():
if not self.keep_running:
return teardown()

try:
op_code, frame = self.sock.recv_data_frame(True)
except WebSocketConnectionClosedException as e:
_logging.error("WebSocketConnectionClosedException - %s" % (reconnect and "reconnecting" or "goodbye"))
return handleDisconnect(e)
except (WebSocketConnectionClosedException, KeyboardInterrupt) as e:
if custom_dispatcher:
return handleDisconnect(e)
else:
raise e
if op_code == ABNF.OPCODE_CLOSE:
return teardown(frame)
elif op_code == ABNF.OPCODE_PING:
Expand Down Expand Up @@ -398,16 +425,18 @@ def check():
return True

def handleDisconnect(e):
self.has_errored = True
self._callback(self.on_error, e)
if isinstance(e, SystemExit):
# propagate SystemExit further
raise
if reconnect and not isinstance(e, KeyboardInterrupt):
_logging.info("websocket disconnected (retrying in %s seconds) [%s frames in stack]" % (reconnect, len(inspect.stack())))
dispatcher.timeout(reconnect, setSock)
dispatcher.reconnect(reconnect, setSock)
else:
teardown()

custom_dispatcher = bool(dispatcher)
dispatcher = self.create_dispatcher(ping_timeout, dispatcher, not not sslopt)

if ping_interval:
Expand All @@ -418,6 +447,7 @@ def handleDisconnect(e):
thread.start()

setSock()
return self.has_errored

def create_dispatcher(self, ping_timeout, dispatcher=None, is_ssl=False):
if dispatcher: # If custom dispatcher is set, use WrappedDispatcher
Expand Down
7 changes: 5 additions & 2 deletions websocket/tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ def on_close(self, *args, **kwargs):
app = ws.WebSocketApp('ws://127.0.0.1:' + LOCAL_WS_SERVER_PORT, on_open=on_open, on_close=on_close, on_message=on_message)
app.run_forever()

@unittest.skipUnless(TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled")
# @unittest.skipUnless(TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled")
@unittest.skipUnless(False, "Test disabled for now (requires rel)")
def testRunForeverDispatcher(self):
""" A WebSocketApp should keep running as long as its self.keep_running
is not False (in the boolean context).
Expand All @@ -98,7 +99,9 @@ def on_message(wsapp, message):
self.close()

app = ws.WebSocketApp('ws://127.0.0.1:' + LOCAL_WS_SERVER_PORT, on_open=on_open, on_message=on_message)
app.run_forever(dispatcher="Dispatcher")
app.run_forever(dispatcher="Dispatcher") # doesn't work
# app.run_forever(dispatcher=rel) # would work
# rel.dispatch()

@unittest.skipUnless(TEST_WITH_LOCAL_SERVER, "Tests using local websocket server are disabled")
def testRunForeverTeardownCleanExit(self):
Expand Down