Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

asyncio-based Socket Mode client improvements #1117

Merged
merged 5 commits into from Sep 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci-build.yml
Expand Up @@ -16,7 +16,7 @@ jobs:
python-version: ['3.6', '3.7', '3.8', '3.9']
env:
PYTHON_SLACK_SDK_MOCK_SERVER_MODE: 'threading'
CI_UNSTABLE_TESTS_SKIP_ENABLED: '1'
#CI_UNSTABLE_TESTS_SKIP_ENABLED: '1'
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
Expand Down
194 changes: 163 additions & 31 deletions slack_sdk/socket_mode/aiohttp/__init__.py
Expand Up @@ -136,59 +136,116 @@ def __init__(
self.message_processor = asyncio.ensure_future(self.process_messages())

async def monitor_current_session(self) -> None:
# In the asyncio runtime, accessing a shared object (self.current_session here) from
# multiple tasks can cause race conditions and errors.
# To avoid such, we access only the session that is active when this loop starts.
session: ClientWebSocketResponse = self.current_session
session_id: str = self.build_session_id(session)

if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"A new monitor_current_session() execution loop for {session_id} started"
)
try:
logging_interval = 100
counter_for_logging = 0

while not self.closed:
if session != self.current_session:
if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"The monitor_current_session task for {session_id} is now cancelled"
)
break
try:
# The logging here is for detailed trouble shooting of potential issues in this client.
# If you don't see this log for a while, it means that
# this receive_messages execution is no longer working for some reason.
counter_for_logging = (counter_for_logging + 1) % logging_interval
if counter_for_logging == 0:
log_message = (
f"{logging_interval} session verification executed after the previous same log"
f" ({session_id})"
)
self.logger.debug(log_message)

await asyncio.sleep(self.ping_interval)
if self.current_session is not None:

if session is not None and session.closed is False:
t = time.time()
if self.last_ping_pong_time is None:
self.last_ping_pong_time = float(t)
await self.current_session.ping(f"sdk-ping-pong:{t}")
try:
await session.ping(f"sdk-ping-pong:{t}")
except Exception as e:
# The ping() method can fail for some reason.
# To establish a new connection even in this scenario,
# we ignore the exception here.
self.logger.warning(
f"Failed to send a ping message ({session_id}): {e}"
)

if self.auto_reconnect_enabled:
should_reconnect = False
if self.current_session is None or self.current_session.closed:
if session is None or session.closed:
self.logger.info(
"The session seems to be already closed. Reconnecting..."
f"The session ({session_id}) seems to be already closed. Reconnecting..."
)
should_reconnect = True

if self.last_ping_pong_time is not None:
if await self.is_ping_pong_failing():
disconnected_seconds = int(
time.time() - self.last_ping_pong_time
)
if disconnected_seconds >= (self.ping_interval * 4):
self.logger.info(
"The connection seems to be stale. Reconnecting..."
f" reason: disconnected for {disconnected_seconds}+ seconds)"
)
self.stale = True
self.last_ping_pong_time = None
should_reconnect = True
self.logger.info(
f"The session ({session_id}) seems to be stale. Reconnecting..."
f" reason: disconnected for {disconnected_seconds}+ seconds)"
)
self.stale = True
self.last_ping_pong_time = None
should_reconnect = True

if should_reconnect is True or not await self.is_connected():
await self.connect_to_new_endpoint()

except Exception as e:
self.logger.error(
"Failed to check the current session or reconnect to the server "
f"Failed to check the current session ({session_id}) or reconnect to the server "
f"(error: {type(e).__name__}, message: {e})"
)
except asyncio.CancelledError:
if self.trace_enabled:
if self.logger.level <= logging.DEBUG:
self.logger.debug(
"The running monitor_current_session task is now cancelled"
f"The monitor_current_session task for {session_id} is now cancelled"
)
raise

async def receive_messages(self) -> None:
# In the asyncio runtime, accessing a shared object (self.current_session here) from
# multiple tasks can cause race conditions and errors.
# To avoid such, we access only the session that is active when this loop starts.
session = self.current_session
session_id = self.build_session_id(session)
if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"A new receive_messages() execution loop with {session_id} started"
)
try:
consecutive_error_count = 0
logging_interval = 100
counter_for_logging = 0

while not self.closed:
if session != self.current_session:
if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"The running receive_messages task for {session_id} is now cancelled"
)
break
try:
message: WSMessage = await self.current_session.receive()
message: WSMessage = await session.receive()
if self.trace_enabled and self.logger.level <= logging.DEBUG:
# The following logging prints every single received message except empty message data ones.
type = WSMsgType(message.type)
message_type = type.name if type is not None else message.type
message_data = message.data
Expand All @@ -197,8 +254,27 @@ async def receive_messages(self) -> None:
if len(message_data) > 0:
# To skip the empty message that Slack server-side often sends
self.logger.debug(
f"Received message (type: {message_type}, data: {message_data}, extra: {message.extra})"
f"Received message "
f"(type: {message_type}, "
f"data: {message_data}, "
f"extra: {message.extra}, "
f"session: {session_id})"
)

# The logging here is for detailed trouble shooting of potential issues in this client.
# If you don't see this log for a while, it means that
# this receive_messages execution is no longer working for some reason.
if self.logger.level <= logging.DEBUG:
counter_for_logging = (
counter_for_logging + 1
) % logging_interval
if counter_for_logging == 0:
log_message = (
f"{logging_interval} WebSocket messages received "
f"after the previous same log ({session_id})"
)
self.logger.debug(log_message)

if message is not None:
if message.type == WSMsgType.TEXT:
message_data = message.data
Expand All @@ -208,7 +284,7 @@ async def receive_messages(self) -> None:
elif message.type == WSMsgType.CLOSE:
if self.auto_reconnect_enabled:
self.logger.info(
"Received CLOSE event. Reconnecting..."
f"Received CLOSE event from {session_id}. Reconnecting..."
)
await self.connect_to_new_endpoint()
for listener in self.on_close_listeners:
Expand All @@ -220,7 +296,7 @@ async def receive_messages(self) -> None:
await asyncio.sleep(self.ping_interval)
continue
elif message.type == WSMsgType.PING:
await self.current_session.pong(message.data)
await session.pong(message.data)
continue
elif message.type == WSMsgType.PONG:
if message.data is not None:
Expand All @@ -235,31 +311,56 @@ async def receive_messages(self) -> None:
except Exception as e:
self.logger.warning(
f"Failed to parse the last_ping_pong_time value from {str_message_data}"
f" - error : {e}"
f" - error : {e}, session: {session_id}"
)
continue
consecutive_error_count = 0
except Exception as e:
consecutive_error_count += 1
self.logger.error(
f"Failed to receive or enqueue a message: {type(e).__name__}, {e}"
f"Failed to receive or enqueue a message: {type(e).__name__}, {e} ({session_id})"
)
if isinstance(e, ClientConnectionError):
await asyncio.sleep(self.ping_interval)
else:
await asyncio.sleep(consecutive_error_count)
except asyncio.CancelledError:
if self.trace_enabled:
self.logger.debug("The running receive_messages task is now cancelled")
if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"The running receive_messages task for {session_id} is now cancelled"
)
raise

async def is_ping_pong_failing(self) -> bool:
if self.last_ping_pong_time is None:
return False
disconnected_seconds = int(time.time() - self.last_ping_pong_time)
return disconnected_seconds >= (self.ping_interval * 4)

async def is_connected(self) -> bool:
return (
connected: bool = (
not self.closed
and not self.stale
and self.current_session is not None
and not self.current_session.closed
and not await self.is_ping_pong_failing()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the ping-pong validation logic to is_ping_pong_failing method and added it to is_connected() method

)
if connected is False and self.logger.level <= logging.DEBUG:
is_ping_pong_failing = await self.is_ping_pong_failing()
session_id = await self.session_id()
self.logger.debug(
"Inactive connection detected ("
f"session_id: {session_id}, "
f"closed: {self.closed}, "
f"stale: {self.stale}, "
f"current_session.closed: {self.current_session.closed}, "
f"is_ping_pong_failing: {is_ping_pong_failing}"
")"
)
return connected

async def session_id(self) -> str:
return self.build_session_id(self.current_session)

async def connect(self):
old_session = None if self.current_session is None else self.current_session
Expand All @@ -271,42 +372,66 @@ async def connect(self):
heartbeat=self.ping_interval,
proxy=self.proxy,
)
session_id: str = await self.session_id()
self.auto_reconnect_enabled = self.default_auto_reconnect_enabled
self.stale = False
self.logger.info("A new session has been established")
self.logger.info(f"A new session ({session_id}) has been established")

# The first ping from the new connection
if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"Sending a ping message with the newly established connection ({session_id})..."
)
t = time.time()
await self.current_session.ping(f"sdk-ping-pong:{t}")

if self.current_session_monitor is not None:
self.current_session_monitor.cancel()

self.current_session_monitor = asyncio.ensure_future(
self.monitor_current_session()
)
if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"A new monitor_current_session() executor has been recreated for {session_id}"
)

if self.message_receiver is not None:
self.message_receiver.cancel()

self.message_receiver = asyncio.ensure_future(self.receive_messages())
if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"A new receive_messages() executor has been recreated for {session_id}"
)

if old_session is not None:
await old_session.close()
self.logger.info("The old session has been abandoned")
old_session_id = self.build_session_id(old_session)
self.logger.info(f"The old session ({old_session_id}) has been abandoned")

async def disconnect(self):
if self.current_session is not None:
await self.current_session.close()
self.logger.info("The session has been abandoned")
session_id = await self.session_id()
self.logger.info(
f"The current session ({session_id}) has been abandoned by disconnect() method call"
)

async def send_message(self, message: str):
session_id = await self.session_id()
if self.logger.level <= logging.DEBUG:
self.logger.debug(f"Sending a message: {message}")
self.logger.debug(
f"Sending a message: {message} from session: {session_id}"
)
try:
await self.current_session.send_str(message)
except ConnectionError as e:
# We rarely get this exception while replacing the underlying WebSocket connections.
# We can do one more try here as the self.current_session should be ready now.
if self.logger.level <= logging.DEBUG:
self.logger.debug(
f"Failed to send a message (error: {e}, message: {message})"
f"Failed to send a message (error: {e}, message: {message}, session: {session_id})"
" as the underlying connection was replaced. Retrying the same request only one time..."
)
# Although acquiring self.connect_operation_lock also for the first method call is the safest way,
Expand All @@ -317,7 +442,8 @@ async def send_message(self, message: str):
await self.current_session.send_str(message)
else:
self.logger.warning(
"The current session is no longer active. Failed to send a message"
f"The current session ({session_id}) is no longer active. "
"Failed to send a message"
)
raise e
finally:
Expand All @@ -336,3 +462,9 @@ async def close(self):
self.message_receiver.cancel()
if self.aiohttp_client_session is not None:
await self.aiohttp_client_session.close()

@classmethod
def build_session_id(cls, session: ClientWebSocketResponse) -> str:
if session is None:
return None
return "s_" + str(hash(session))