diff --git a/.github/workflows/ci-build.yml b/.github/workflows/ci-build.yml index ca59c11ac..15ad9e500 100644 --- a/.github/workflows/ci-build.yml +++ b/.github/workflows/ci-build.yml @@ -16,7 +16,7 @@ jobs: python-version: ['3.6', '3.7', '3.8', '3.9'] env: PYTHON_SLACK_SDK_MOCK_SERVER_MODE: 'threading' - CI_UNSTABLE_TESTS_SKIP_ENABLED: '1' + #CI_UNSTABLE_TESTS_SKIP_ENABLED: '1' steps: - uses: actions/checkout@v2 - name: Set up Python ${{ matrix.python-version }} diff --git a/slack_sdk/socket_mode/aiohttp/__init__.py b/slack_sdk/socket_mode/aiohttp/__init__.py index f72546b65..87f251a26 100644 --- a/slack_sdk/socket_mode/aiohttp/__init__.py +++ b/slack_sdk/socket_mode/aiohttp/__init__.py @@ -136,59 +136,116 @@ def __init__( self.message_processor = asyncio.ensure_future(self.process_messages()) async def monitor_current_session(self) -> None: + # In the asyncio runtime, accessing a shared object (self.current_session here) from + # multiple tasks can cause race conditions and errors. + # To avoid such, we access only the session that is active when this loop starts. + session: ClientWebSocketResponse = self.current_session + session_id: str = self.build_session_id(session) + + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new monitor_current_session() execution loop for {session_id} started" + ) try: + logging_interval = 100 + counter_for_logging = 0 + while not self.closed: + if session != self.current_session: + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"The monitor_current_session task for {session_id} is now cancelled" + ) + break try: + # The logging here is for detailed trouble shooting of potential issues in this client. + # If you don't see this log for a while, it means that + # this receive_messages execution is no longer working for some reason. + counter_for_logging = (counter_for_logging + 1) % logging_interval + if counter_for_logging == 0: + log_message = ( + f"{logging_interval} session verification executed after the previous same log" + f" ({session_id})" + ) + self.logger.debug(log_message) + await asyncio.sleep(self.ping_interval) - if self.current_session is not None: + + if session is not None and session.closed is False: t = time.time() if self.last_ping_pong_time is None: self.last_ping_pong_time = float(t) - await self.current_session.ping(f"sdk-ping-pong:{t}") + try: + await session.ping(f"sdk-ping-pong:{t}") + except Exception as e: + # The ping() method can fail for some reason. + # To establish a new connection even in this scenario, + # we ignore the exception here. + self.logger.warning( + f"Failed to send a ping message ({session_id}): {e}" + ) if self.auto_reconnect_enabled: should_reconnect = False - if self.current_session is None or self.current_session.closed: + if session is None or session.closed: self.logger.info( - "The session seems to be already closed. Reconnecting..." + f"The session ({session_id}) seems to be already closed. Reconnecting..." ) should_reconnect = True - if self.last_ping_pong_time is not None: + if await self.is_ping_pong_failing(): disconnected_seconds = int( time.time() - self.last_ping_pong_time ) - if disconnected_seconds >= (self.ping_interval * 4): - self.logger.info( - "The connection seems to be stale. Reconnecting..." - f" reason: disconnected for {disconnected_seconds}+ seconds)" - ) - self.stale = True - self.last_ping_pong_time = None - should_reconnect = True + self.logger.info( + f"The session ({session_id}) seems to be stale. Reconnecting..." + f" reason: disconnected for {disconnected_seconds}+ seconds)" + ) + self.stale = True + self.last_ping_pong_time = None + should_reconnect = True if should_reconnect is True or not await self.is_connected(): await self.connect_to_new_endpoint() except Exception as e: self.logger.error( - "Failed to check the current session or reconnect to the server " + f"Failed to check the current session ({session_id}) or reconnect to the server " f"(error: {type(e).__name__}, message: {e})" ) except asyncio.CancelledError: - if self.trace_enabled: + if self.logger.level <= logging.DEBUG: self.logger.debug( - "The running monitor_current_session task is now cancelled" + f"The monitor_current_session task for {session_id} is now cancelled" ) raise async def receive_messages(self) -> None: + # In the asyncio runtime, accessing a shared object (self.current_session here) from + # multiple tasks can cause race conditions and errors. + # To avoid such, we access only the session that is active when this loop starts. + session = self.current_session + session_id = self.build_session_id(session) + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new receive_messages() execution loop with {session_id} started" + ) try: consecutive_error_count = 0 + logging_interval = 100 + counter_for_logging = 0 + while not self.closed: + if session != self.current_session: + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"The running receive_messages task for {session_id} is now cancelled" + ) + break try: - message: WSMessage = await self.current_session.receive() + message: WSMessage = await session.receive() if self.trace_enabled and self.logger.level <= logging.DEBUG: + # The following logging prints every single received message except empty message data ones. type = WSMsgType(message.type) message_type = type.name if type is not None else message.type message_data = message.data @@ -197,8 +254,27 @@ async def receive_messages(self) -> None: if len(message_data) > 0: # To skip the empty message that Slack server-side often sends self.logger.debug( - f"Received message (type: {message_type}, data: {message_data}, extra: {message.extra})" + f"Received message " + f"(type: {message_type}, " + f"data: {message_data}, " + f"extra: {message.extra}, " + f"session: {session_id})" ) + + # The logging here is for detailed trouble shooting of potential issues in this client. + # If you don't see this log for a while, it means that + # this receive_messages execution is no longer working for some reason. + if self.logger.level <= logging.DEBUG: + counter_for_logging = ( + counter_for_logging + 1 + ) % logging_interval + if counter_for_logging == 0: + log_message = ( + f"{logging_interval} WebSocket messages received " + f"after the previous same log ({session_id})" + ) + self.logger.debug(log_message) + if message is not None: if message.type == WSMsgType.TEXT: message_data = message.data @@ -208,7 +284,7 @@ async def receive_messages(self) -> None: elif message.type == WSMsgType.CLOSE: if self.auto_reconnect_enabled: self.logger.info( - "Received CLOSE event. Reconnecting..." + f"Received CLOSE event from {session_id}. Reconnecting..." ) await self.connect_to_new_endpoint() for listener in self.on_close_listeners: @@ -220,7 +296,7 @@ async def receive_messages(self) -> None: await asyncio.sleep(self.ping_interval) continue elif message.type == WSMsgType.PING: - await self.current_session.pong(message.data) + await session.pong(message.data) continue elif message.type == WSMsgType.PONG: if message.data is not None: @@ -235,31 +311,56 @@ async def receive_messages(self) -> None: except Exception as e: self.logger.warning( f"Failed to parse the last_ping_pong_time value from {str_message_data}" - f" - error : {e}" + f" - error : {e}, session: {session_id}" ) continue consecutive_error_count = 0 except Exception as e: consecutive_error_count += 1 self.logger.error( - f"Failed to receive or enqueue a message: {type(e).__name__}, {e}" + f"Failed to receive or enqueue a message: {type(e).__name__}, {e} ({session_id})" ) if isinstance(e, ClientConnectionError): await asyncio.sleep(self.ping_interval) else: await asyncio.sleep(consecutive_error_count) except asyncio.CancelledError: - if self.trace_enabled: - self.logger.debug("The running receive_messages task is now cancelled") + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"The running receive_messages task for {session_id} is now cancelled" + ) raise + async def is_ping_pong_failing(self) -> bool: + if self.last_ping_pong_time is None: + return False + disconnected_seconds = int(time.time() - self.last_ping_pong_time) + return disconnected_seconds >= (self.ping_interval * 4) + async def is_connected(self) -> bool: - return ( + connected: bool = ( not self.closed and not self.stale and self.current_session is not None and not self.current_session.closed + and not await self.is_ping_pong_failing() ) + if connected is False and self.logger.level <= logging.DEBUG: + is_ping_pong_failing = await self.is_ping_pong_failing() + session_id = await self.session_id() + self.logger.debug( + "Inactive connection detected (" + f"session_id: {session_id}, " + f"closed: {self.closed}, " + f"stale: {self.stale}, " + f"current_session.closed: {self.current_session.closed}, " + f"is_ping_pong_failing: {is_ping_pong_failing}" + ")" + ) + return connected + + async def session_id(self) -> str: + return self.build_session_id(self.current_session) async def connect(self): old_session = None if self.current_session is None else self.current_session @@ -271,9 +372,18 @@ async def connect(self): heartbeat=self.ping_interval, proxy=self.proxy, ) + session_id: str = await self.session_id() self.auto_reconnect_enabled = self.default_auto_reconnect_enabled self.stale = False - self.logger.info("A new session has been established") + self.logger.info(f"A new session ({session_id}) has been established") + + # The first ping from the new connection + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"Sending a ping message with the newly established connection ({session_id})..." + ) + t = time.time() + await self.current_session.ping(f"sdk-ping-pong:{t}") if self.current_session_monitor is not None: self.current_session_monitor.cancel() @@ -281,24 +391,39 @@ async def connect(self): self.current_session_monitor = asyncio.ensure_future( self.monitor_current_session() ) + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new monitor_current_session() executor has been recreated for {session_id}" + ) if self.message_receiver is not None: self.message_receiver.cancel() self.message_receiver = asyncio.ensure_future(self.receive_messages()) + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new receive_messages() executor has been recreated for {session_id}" + ) if old_session is not None: await old_session.close() - self.logger.info("The old session has been abandoned") + old_session_id = self.build_session_id(old_session) + self.logger.info(f"The old session ({old_session_id}) has been abandoned") async def disconnect(self): if self.current_session is not None: await self.current_session.close() - self.logger.info("The session has been abandoned") + session_id = await self.session_id() + self.logger.info( + f"The current session ({session_id}) has been abandoned by disconnect() method call" + ) async def send_message(self, message: str): + session_id = await self.session_id() if self.logger.level <= logging.DEBUG: - self.logger.debug(f"Sending a message: {message}") + self.logger.debug( + f"Sending a message: {message} from session: {session_id}" + ) try: await self.current_session.send_str(message) except ConnectionError as e: @@ -306,7 +431,7 @@ async def send_message(self, message: str): # We can do one more try here as the self.current_session should be ready now. if self.logger.level <= logging.DEBUG: self.logger.debug( - f"Failed to send a message (error: {e}, message: {message})" + f"Failed to send a message (error: {e}, message: {message}, session: {session_id})" " as the underlying connection was replaced. Retrying the same request only one time..." ) # Although acquiring self.connect_operation_lock also for the first method call is the safest way, @@ -317,7 +442,8 @@ async def send_message(self, message: str): await self.current_session.send_str(message) else: self.logger.warning( - "The current session is no longer active. Failed to send a message" + f"The current session ({session_id}) is no longer active. " + "Failed to send a message" ) raise e finally: @@ -336,3 +462,9 @@ async def close(self): self.message_receiver.cancel() if self.aiohttp_client_session is not None: await self.aiohttp_client_session.close() + + @classmethod + def build_session_id(cls, session: ClientWebSocketResponse) -> str: + if session is None: + return None + return "s_" + str(hash(session)) diff --git a/slack_sdk/socket_mode/async_client.py b/slack_sdk/socket_mode/async_client.py index 4a1b4d8b2..26cae2f5a 100644 --- a/slack_sdk/socket_mode/async_client.py +++ b/slack_sdk/socket_mode/async_client.py @@ -64,6 +64,9 @@ async def issue_new_wss_url(self) -> str: async def is_connected(self) -> bool: return False + async def session_id(self) -> str: + return "" + async def connect(self): raise NotImplementedError() @@ -71,11 +74,12 @@ async def disconnect(self): raise NotImplementedError() async def connect_to_new_endpoint(self, force: bool = False): + session_id = await self.session_id() try: await self.connect_operation_lock.acquire() if self.trace_enabled: self.logger.debug( - "For reconnection, the connect_operation_lock was acquired" + f"For reconnection, the connect_operation_lock was acquired (session: {session_id})" ) if force or not await self.is_connected(): self.wss_uri = await self.issue_new_wss_url() @@ -85,7 +89,7 @@ async def connect_to_new_endpoint(self, force: bool = False): self.connect_operation_lock.release() if self.trace_enabled: self.logger.debug( - "The connect_operation_lock for reconnection was released" + f"The connect_operation_lock for reconnection was released (session: {session_id})" ) async def close(self): @@ -107,20 +111,26 @@ async def enqueue_message(self, message: str): await self.message_queue.put(message) if self.logger.level <= logging.DEBUG: queue_size = self.message_queue.qsize() + session_id = await self.session_id() self.logger.debug( - f"A new message enqueued (current queue size: {queue_size})" + f"A new message enqueued (current queue size: {queue_size}, session: {session_id})" ) async def process_messages(self): + session_id = await self.session_id() try: while not self.closed: try: await self.process_message() except Exception as e: - self.logger.exception(f"Failed to process a message: {e}") + self.logger.exception( + f"Failed to process a message: {e}, session: {session_id}" + ) except asyncio.CancelledError: if self.trace_enabled: - self.logger.debug("The running process_messages task is now cancelled") + self.logger.debug( + f"The running process_messages task for {session_id} is now cancelled" + ) raise async def process_message(self): @@ -134,10 +144,11 @@ async def process_message(self): ) async def run_message_listeners(self, message: dict, raw_message: str) -> None: + session_id = await self.session_id() type, envelope_id = message.get("type"), message.get("envelope_id") if self.logger.level <= logging.DEBUG: self.logger.debug( - f"Message processing started (type: {type}, envelope_id: {envelope_id})" + f"Message processing started (type: {type}, envelope_id: {envelope_id}, session: {session_id})" ) try: if message.get("type") == "disconnect": @@ -148,7 +159,9 @@ async def run_message_listeners(self, message: dict, raw_message: str) -> None: try: await listener(self, message, raw_message) except Exception as e: - self.logger.exception(f"Failed to run a message listener: {e}") + self.logger.exception( + f"Failed to run a message listener: {e}, session: {session_id}" + ) if len(self.socket_mode_request_listeners) > 0: request = SocketModeRequest.from_dict(message) @@ -158,12 +171,17 @@ async def run_message_listeners(self, message: dict, raw_message: str) -> None: await listener(self, request) except Exception as e: self.logger.exception( - f"Failed to run a request listener: {e}" + f"Failed to run a request listener: {e}, session: {session_id}" ) except Exception as e: - self.logger.exception(f"Failed to run message listeners: {e}") + self.logger.exception( + f"Failed to run message listeners: {e}, session: {session_id}" + ) finally: if self.logger.level <= logging.DEBUG: self.logger.debug( - f"Message processing completed (type: {type}, envelope_id: {envelope_id})" + f"Message processing completed (" + f"type: {type}, " + f"envelope_id: {envelope_id}, " + f"session: {session_id})" ) diff --git a/slack_sdk/socket_mode/websockets/__init__.py b/slack_sdk/socket_mode/websockets/__init__.py index 7cf45ac4d..90af0fc64 100644 --- a/slack_sdk/socket_mode/websockets/__init__.py +++ b/slack_sdk/socket_mode/websockets/__init__.py @@ -13,8 +13,8 @@ from typing import Union, Optional, List, Callable, Awaitable import websockets -from websockets.client import WebSocketClientProtocol from websockets.exceptions import WebSocketException +from websockets.legacy.client import WebSocketClientProtocol from slack_sdk.socket_mode.async_client import AsyncBaseSocketModeClient from slack_sdk.socket_mode.async_listeners import ( @@ -100,43 +100,89 @@ def __init__( self.message_processor = asyncio.ensure_future(self.process_messages()) async def monitor_current_session(self) -> None: - while not self.closed: - await asyncio.sleep(self.ping_interval) - try: - if self.auto_reconnect_enabled and ( - self.current_session is None or self.current_session.closed - ): - self.logger.info( - "The session seems to be already closed. Reconnecting..." + # In the asyncio runtime, accessing a shared object (self.current_session here) from + # multiple tasks can cause race conditions and errors. + # To avoid such, we access only the session that is active when this loop starts. + session: WebSocketClientProtocol = self.current_session + session_id: str = await self.session_id() + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new monitor_current_session() execution loop for {session_id} started" + ) + try: + while not self.closed: + if session != self.current_session: + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"The monitor_current_session task for {session_id} is now cancelled" + ) + break + await asyncio.sleep(self.ping_interval) + try: + if self.auto_reconnect_enabled and ( + session is None or session.closed + ): + self.logger.info( + f"The session ({session_id}) seems to be already closed. Reconnecting..." + ) + await self.connect_to_new_endpoint() + except Exception as e: + self.logger.error( + "Failed to check the current session or reconnect to the server " + f"(error: {type(e).__name__}, message: {e}, session: {session_id})" ) - await self.connect_to_new_endpoint() - except Exception as e: - self.logger.error( - "Failed to check the current session or reconnect to the server " - f"(error: {type(e).__name__}, message: {e})" + except asyncio.CancelledError: + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"The monitor_current_session task for {session_id} is now cancelled" ) + raise async def receive_messages(self) -> None: + # In the asyncio runtime, accessing a shared object (self.current_session here) from + # multiple tasks can cause race conditions and errors. + # To avoid such, we access only the session that is active when this loop starts. + session: WebSocketClientProtocol = self.current_session + session_id: str = await self.session_id() consecutive_error_count = 0 - while not self.closed: - try: - message = await self.current_session.recv() - if message is not None: - if isinstance(message, bytes): - message = message.decode("utf-8") + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new receive_messages() execution loop with {session_id} started" + ) + try: + while not self.closed: + if session != self.current_session: if self.logger.level <= logging.DEBUG: - self.logger.debug(f"Received message: {message}") - await self.enqueue_message(message) - consecutive_error_count = 0 - except Exception as e: - consecutive_error_count += 1 - self.logger.error( - f"Failed to receive or enqueue a message: {type(e).__name__}, {e}" + self.logger.debug( + f"The running receive_messages task for {session_id} is now cancelled" + ) + break + try: + message = await session.recv() + if message is not None: + if isinstance(message, bytes): + message = message.decode("utf-8") + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"Received message: {message}, session: {session_id}" + ) + await self.enqueue_message(message) + consecutive_error_count = 0 + except Exception as e: + consecutive_error_count += 1 + self.logger.error( + f"Failed to receive or enqueue a message: {type(e).__name__}, error: {e}, session: {session_id}" + ) + if isinstance(e, websockets.ConnectionClosedError): + await asyncio.sleep(self.ping_interval) + else: + await asyncio.sleep(consecutive_error_count) + except asyncio.CancelledError: + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"The running receive_messages task for {session_id} is now cancelled" ) - if isinstance(e, websockets.ConnectionClosedError): - await asyncio.sleep(self.ping_interval) - else: - await asyncio.sleep(consecutive_error_count) + raise async def is_connected(self) -> bool: return ( @@ -145,6 +191,9 @@ async def is_connected(self) -> bool: and not self.current_session.closed ) + async def session_id(self) -> str: + return self.build_session_id(self.current_session) + async def connect(self): if self.wss_uri is None: self.wss_uri = await self.issue_new_wss_url() @@ -156,36 +205,52 @@ async def connect(self): uri=self.wss_uri, ping_interval=self.ping_interval, ) + session_id = await self.session_id() self.auto_reconnect_enabled = self.default_auto_reconnect_enabled - self.logger.info("A new session has been established") + self.logger.info(f"A new session ({session_id}) has been established") - if self.current_session_monitor is None: - self.current_session_monitor = asyncio.ensure_future( - self.monitor_current_session() + if self.current_session_monitor is not None: + self.current_session_monitor.cancel() + self.current_session_monitor = asyncio.ensure_future( + self.monitor_current_session() + ) + + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new monitor_current_session() executor has been recreated for {session_id}" ) - if self.message_receiver is None: - self.message_receiver = asyncio.ensure_future(self.receive_messages()) + if self.message_receiver is not None: + self.message_receiver.cancel() + self.message_receiver = asyncio.ensure_future(self.receive_messages()) + + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new receive_messages() executor has been recreated for {session_id}" + ) if old_session is not None: await old_session.close() - self.logger.info("The old session has been abandoned") + old_session_id = self.build_session_id(old_session) + self.logger.info(f"The old session ({old_session_id}) has been abandoned") async def disconnect(self): if self.current_session is not None: await self.current_session.close() async def send_message(self, message: str): + session = self.current_session + session_id = self.build_session_id(session) if self.logger.level <= logging.DEBUG: - self.logger.debug(f"Sending a message: {message}") + self.logger.debug(f"Sending a message: {message}, session: {session_id}") try: - await self.current_session.send(message) + await session.send(message) except WebSocketException as e: # We rarely get this exception while replacing the underlying WebSocket connections. # We can do one more try here as the self.current_session should be ready now. if self.logger.level <= logging.DEBUG: self.logger.debug( - f"Failed to send a message (error: {e}, message: {message})" + f"Failed to send a message (error: {e}, message: {message}, session: {session_id})" " as the underlying connection was replaced. Retrying the same request only one time..." ) # Although acquiring self.connect_operation_lock also for the first method call is the safest way, @@ -195,7 +260,7 @@ async def send_message(self, message: str): await self.current_session.send(message) else: self.logger.warning( - "The current session is no longer active. Failed to send a message" + f"The current session ({session_id}) is no longer active. Failed to send a message" ) raise e finally: @@ -211,3 +276,9 @@ async def close(self): self.current_session_monitor.cancel() if self.message_receiver is not None: self.message_receiver.cancel() + + @classmethod + def build_session_id(cls, session: WebSocketClientProtocol) -> str: + if session is None: + return None + return "s_" + str(hash(session))