Skip to content

Commit

Permalink
asyncio-based Socket Mode client improvements (#1117)
Browse files Browse the repository at this point in the history
* Add initial ping right after establishing a new connection in AIOHTTP-based SocketModeClient
* Add is_ping_pong_failing check to is_connected method
* Many improvements to the session monitoring job
* Fix syntax error in Python 3.6
* Enable Socket Mode tests
  • Loading branch information
seratch committed Sep 18, 2021
1 parent 9b3cf06 commit 9fd264f
Show file tree
Hide file tree
Showing 4 changed files with 305 additions and 84 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-build.yml
Expand Up @@ -16,7 +16,7 @@ jobs:
python-version: ['3.6', '3.7', '3.8', '3.9']
env:
PYTHON_SLACK_SDK_MOCK_SERVER_MODE: 'threading'
CI_UNSTABLE_TESTS_SKIP_ENABLED: '1'
#CI_UNSTABLE_TESTS_SKIP_ENABLED: '1'
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
Expand Down
194 changes: 163 additions & 31 deletions slack_sdk/socket_mode/aiohttp/__init__.py
Expand Up @@ -136,59 +136,116 @@ def __init__(
self.message_processor = asyncio.ensure_future(self.process_messages())

async def monitor_current_session(self) -> None:
# In the asyncio runtime, accessing a shared object (self.current_session here) from
# multiple tasks can cause race conditions and errors.
# To avoid such, we access only the session that is active when this loop starts.
session: ClientWebSocketResponse = self.current_session
session_id: str = self.build_session_id(session)

if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"A new monitor_current_session() execution loop for {session_id} started"
)
try:
logging_interval = 100
counter_for_logging = 0

while not self.closed:
if session != self.current_session:
if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"The monitor_current_session task for {session_id} is now cancelled"
)
break
try:
# The logging here is for detailed trouble shooting of potential issues in this client.
# If you don't see this log for a while, it means that
# this receive_messages execution is no longer working for some reason.
counter_for_logging = (counter_for_logging + 1) % logging_interval
if counter_for_logging == 0:
log_message = (
f"{logging_interval} session verification executed after the previous same log"
f" ({session_id})"
)
self.logger.debug(log_message)

await asyncio.sleep(self.ping_interval)
if self.current_session is not None:

if session is not None and session.closed is False:
t = time.time()
if self.last_ping_pong_time is None:
self.last_ping_pong_time = float(t)
await self.current_session.ping(f"sdk-ping-pong:{t}")
try:
await session.ping(f"sdk-ping-pong:{t}")
except Exception as e:
# The ping() method can fail for some reason.
# To establish a new connection even in this scenario,
# we ignore the exception here.
self.logger.warning(
f"Failed to send a ping message ({session_id}): {e}"
)

if self.auto_reconnect_enabled:
should_reconnect = False
if self.current_session is None or self.current_session.closed:
if session is None or session.closed:
self.logger.info(
"The session seems to be already closed. Reconnecting..."
f"The session ({session_id}) seems to be already closed. Reconnecting..."
)
should_reconnect = True

if self.last_ping_pong_time is not None:
if await self.is_ping_pong_failing():
disconnected_seconds = int(
time.time() - self.last_ping_pong_time
)
if disconnected_seconds >= (self.ping_interval * 4):
self.logger.info(
"The connection seems to be stale. Reconnecting..."
f" reason: disconnected for {disconnected_seconds}+ seconds)"
)
self.stale = True
self.last_ping_pong_time = None
should_reconnect = True
self.logger.info(
f"The session ({session_id}) seems to be stale. Reconnecting..."
f" reason: disconnected for {disconnected_seconds}+ seconds)"
)
self.stale = True
self.last_ping_pong_time = None
should_reconnect = True

if should_reconnect is True or not await self.is_connected():
await self.connect_to_new_endpoint()

except Exception as e:
self.logger.error(
"Failed to check the current session or reconnect to the server "
f"Failed to check the current session ({session_id}) or reconnect to the server "
f"(error: {type(e).__name__}, message: {e})"
)
except asyncio.CancelledError:
if self.trace_enabled:
if self.logger.level <= logging.DEBUG:
self.logger.debug(
"The running monitor_current_session task is now cancelled"
f"The monitor_current_session task for {session_id} is now cancelled"
)
raise

async def receive_messages(self) -> None:
# In the asyncio runtime, accessing a shared object (self.current_session here) from
# multiple tasks can cause race conditions and errors.
# To avoid such, we access only the session that is active when this loop starts.
session = self.current_session
session_id = self.build_session_id(session)
if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"A new receive_messages() execution loop with {session_id} started"
)
try:
consecutive_error_count = 0
logging_interval = 100
counter_for_logging = 0

while not self.closed:
if session != self.current_session:
if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"The running receive_messages task for {session_id} is now cancelled"
)
break
try:
message: WSMessage = await self.current_session.receive()
message: WSMessage = await session.receive()
if self.trace_enabled and self.logger.level <= logging.DEBUG:
# The following logging prints every single received message except empty message data ones.
type = WSMsgType(message.type)
message_type = type.name if type is not None else message.type
message_data = message.data
Expand All @@ -197,8 +254,27 @@ async def receive_messages(self) -> None:
if len(message_data) > 0:
# To skip the empty message that Slack server-side often sends
self.logger.debug(
f"Received message (type: {message_type}, data: {message_data}, extra: {message.extra})"
f"Received message "
f"(type: {message_type}, "
f"data: {message_data}, "
f"extra: {message.extra}, "
f"session: {session_id})"
)

# The logging here is for detailed trouble shooting of potential issues in this client.
# If you don't see this log for a while, it means that
# this receive_messages execution is no longer working for some reason.
if self.logger.level <= logging.DEBUG:
counter_for_logging = (
counter_for_logging + 1
) % logging_interval
if counter_for_logging == 0:
log_message = (
f"{logging_interval} WebSocket messages received "
f"after the previous same log ({session_id})"
)
self.logger.debug(log_message)

if message is not None:
if message.type == WSMsgType.TEXT:
message_data = message.data
Expand All @@ -208,7 +284,7 @@ async def receive_messages(self) -> None:
elif message.type == WSMsgType.CLOSE:
if self.auto_reconnect_enabled:
self.logger.info(
"Received CLOSE event. Reconnecting..."
f"Received CLOSE event from {session_id}. Reconnecting..."
)
await self.connect_to_new_endpoint()
for listener in self.on_close_listeners:
Expand All @@ -220,7 +296,7 @@ async def receive_messages(self) -> None:
await asyncio.sleep(self.ping_interval)
continue
elif message.type == WSMsgType.PING:
await self.current_session.pong(message.data)
await session.pong(message.data)
continue
elif message.type == WSMsgType.PONG:
if message.data is not None:
Expand All @@ -235,31 +311,56 @@ async def receive_messages(self) -> None:
except Exception as e:
self.logger.warning(
f"Failed to parse the last_ping_pong_time value from {str_message_data}"
f" - error : {e}"
f" - error : {e}, session: {session_id}"
)
continue
consecutive_error_count = 0
except Exception as e:
consecutive_error_count += 1
self.logger.error(
f"Failed to receive or enqueue a message: {type(e).__name__}, {e}"
f"Failed to receive or enqueue a message: {type(e).__name__}, {e} ({session_id})"
)
if isinstance(e, ClientConnectionError):
await asyncio.sleep(self.ping_interval)
else:
await asyncio.sleep(consecutive_error_count)
except asyncio.CancelledError:
if self.trace_enabled:
self.logger.debug("The running receive_messages task is now cancelled")
if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"The running receive_messages task for {session_id} is now cancelled"
)
raise

async def is_ping_pong_failing(self) -> bool:
if self.last_ping_pong_time is None:
return False
disconnected_seconds = int(time.time() - self.last_ping_pong_time)
return disconnected_seconds >= (self.ping_interval * 4)

async def is_connected(self) -> bool:
return (
connected: bool = (
not self.closed
and not self.stale
and self.current_session is not None
and not self.current_session.closed
and not await self.is_ping_pong_failing()
)
if connected is False and self.logger.level <= logging.DEBUG:
is_ping_pong_failing = await self.is_ping_pong_failing()
session_id = await self.session_id()
self.logger.debug(
"Inactive connection detected ("
f"session_id: {session_id}, "
f"closed: {self.closed}, "
f"stale: {self.stale}, "
f"current_session.closed: {self.current_session.closed}, "
f"is_ping_pong_failing: {is_ping_pong_failing}"
")"
)
return connected

async def session_id(self) -> str:
return self.build_session_id(self.current_session)

async def connect(self):
old_session = None if self.current_session is None else self.current_session
Expand All @@ -271,42 +372,66 @@ async def connect(self):
heartbeat=self.ping_interval,
proxy=self.proxy,
)
session_id: str = await self.session_id()
self.auto_reconnect_enabled = self.default_auto_reconnect_enabled
self.stale = False
self.logger.info("A new session has been established")
self.logger.info(f"A new session ({session_id}) has been established")

# The first ping from the new connection
if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"Sending a ping message with the newly established connection ({session_id})..."
)
t = time.time()
await self.current_session.ping(f"sdk-ping-pong:{t}")

if self.current_session_monitor is not None:
self.current_session_monitor.cancel()

self.current_session_monitor = asyncio.ensure_future(
self.monitor_current_session()
)
if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"A new monitor_current_session() executor has been recreated for {session_id}"
)

if self.message_receiver is not None:
self.message_receiver.cancel()

self.message_receiver = asyncio.ensure_future(self.receive_messages())
if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"A new receive_messages() executor has been recreated for {session_id}"
)

if old_session is not None:
await old_session.close()
self.logger.info("The old session has been abandoned")
old_session_id = self.build_session_id(old_session)
self.logger.info(f"The old session ({old_session_id}) has been abandoned")

async def disconnect(self):
if self.current_session is not None:
await self.current_session.close()
self.logger.info("The session has been abandoned")
session_id = await self.session_id()
self.logger.info(
f"The current session ({session_id}) has been abandoned by disconnect() method call"
)

async def send_message(self, message: str):
session_id = await self.session_id()
if self.logger.level <= logging.DEBUG:
self.logger.debug(f"Sending a message: {message}")
self.logger.debug(
f"Sending a message: {message} from session: {session_id}"
)
try:
await self.current_session.send_str(message)
except ConnectionError as e:
# We rarely get this exception while replacing the underlying WebSocket connections.
# We can do one more try here as the self.current_session should be ready now.
if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"Failed to send a message (error: {e}, message: {message})"
f"Failed to send a message (error: {e}, message: {message}, session: {session_id})"
" as the underlying connection was replaced. Retrying the same request only one time..."
)
# Although acquiring self.connect_operation_lock also for the first method call is the safest way,
Expand All @@ -317,7 +442,8 @@ async def send_message(self, message: str):
await self.current_session.send_str(message)
else:
self.logger.warning(
"The current session is no longer active. Failed to send a message"
f"The current session ({session_id}) is no longer active. "
"Failed to send a message"
)
raise e
finally:
Expand All @@ -336,3 +462,9 @@ async def close(self):
self.message_receiver.cancel()
if self.aiohttp_client_session is not None:
await self.aiohttp_client_session.close()

@classmethod
def build_session_id(cls, session: ClientWebSocketResponse) -> str:
if session is None:
return None
return "s_" + str(hash(session))

0 comments on commit 9fd264f

Please sign in to comment.