diff --git a/docs/api-docs/slack_sdk/socket_mode/aiohttp/index.html b/docs/api-docs/slack_sdk/socket_mode/aiohttp/index.html index bd5eeed00..dc9a33ad5 100644 --- a/docs/api-docs/slack_sdk/socket_mode/aiohttp/index.html +++ b/docs/api-docs/slack_sdk/socket_mode/aiohttp/index.html @@ -170,59 +170,116 @@

Module slack_sdk.socket_mode.aiohttp

self.message_processor = asyncio.ensure_future(self.process_messages()) async def monitor_current_session(self) -> None: + # In the asyncio runtime, accessing a shared object (self.current_session here) from + # multiple tasks can cause race conditions and errors. + # To avoid such, we access only the session that is active when this loop starts. + session: ClientWebSocketResponse = self.current_session + session_id: str = self.build_session_id(session) + + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new monitor_current_session() execution loop for {session_id} started" + ) try: + logging_interval = 100 + counter_for_logging = 0 + while not self.closed: + if session != self.current_session: + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"The monitor_current_session task for {session_id} is now cancelled" + ) + break try: + # The logging here is for detailed trouble shooting of potential issues in this client. + # If you don't see this log for a while, it means that + # this receive_messages execution is no longer working for some reason. + counter_for_logging = (counter_for_logging + 1) % logging_interval + if counter_for_logging == 0: + log_message = ( + f"{logging_interval} session verification executed after the previous same log" + f" ({session_id})" + ) + self.logger.debug(log_message) + await asyncio.sleep(self.ping_interval) - if self.current_session is not None: + + if session is not None and session.closed is False: t = time.time() if self.last_ping_pong_time is None: self.last_ping_pong_time = float(t) - await self.current_session.ping(f"sdk-ping-pong:{t}") + try: + await session.ping(f"sdk-ping-pong:{t}") + except Exception as e: + # The ping() method can fail for some reason. + # To establish a new connection even in this scenario, + # we ignore the exception here. + self.logger.warning( + f"Failed to send a ping message ({session_id}): {e}" + ) if self.auto_reconnect_enabled: should_reconnect = False - if self.current_session is None or self.current_session.closed: + if session is None or session.closed: self.logger.info( - "The session seems to be already closed. Reconnecting..." + f"The session ({session_id}) seems to be already closed. Reconnecting..." ) should_reconnect = True - if self.last_ping_pong_time is not None: + if await self.is_ping_pong_failing(): disconnected_seconds = int( time.time() - self.last_ping_pong_time ) - if disconnected_seconds >= (self.ping_interval * 4): - self.logger.info( - "The connection seems to be stale. Reconnecting..." - f" reason: disconnected for {disconnected_seconds}+ seconds)" - ) - self.stale = True - self.last_ping_pong_time = None - should_reconnect = True + self.logger.info( + f"The session ({session_id}) seems to be stale. Reconnecting..." + f" reason: disconnected for {disconnected_seconds}+ seconds)" + ) + self.stale = True + self.last_ping_pong_time = None + should_reconnect = True if should_reconnect is True or not await self.is_connected(): await self.connect_to_new_endpoint() except Exception as e: self.logger.error( - "Failed to check the current session or reconnect to the server " + f"Failed to check the current session ({session_id}) or reconnect to the server " f"(error: {type(e).__name__}, message: {e})" ) except asyncio.CancelledError: - if self.trace_enabled: + if self.logger.level <= logging.DEBUG: self.logger.debug( - "The running monitor_current_session task is now cancelled" + f"The monitor_current_session task for {session_id} is now cancelled" ) raise async def receive_messages(self) -> None: + # In the asyncio runtime, accessing a shared object (self.current_session here) from + # multiple tasks can cause race conditions and errors. + # To avoid such, we access only the session that is active when this loop starts. + session = self.current_session + session_id = self.build_session_id(session) + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new receive_messages() execution loop with {session_id} started" + ) try: consecutive_error_count = 0 + logging_interval = 100 + counter_for_logging = 0 + while not self.closed: + if session != self.current_session: + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"The running receive_messages task for {session_id} is now cancelled" + ) + break try: - message: WSMessage = await self.current_session.receive() + message: WSMessage = await session.receive() if self.trace_enabled and self.logger.level <= logging.DEBUG: + # The following logging prints every single received message except empty message data ones. type = WSMsgType(message.type) message_type = type.name if type is not None else message.type message_data = message.data @@ -231,8 +288,27 @@

Module slack_sdk.socket_mode.aiohttp

if len(message_data) > 0: # To skip the empty message that Slack server-side often sends self.logger.debug( - f"Received message (type: {message_type}, data: {message_data}, extra: {message.extra})" + f"Received message " + f"(type: {message_type}, " + f"data: {message_data}, " + f"extra: {message.extra}, " + f"session: {session_id})" + ) + + # The logging here is for detailed trouble shooting of potential issues in this client. + # If you don't see this log for a while, it means that + # this receive_messages execution is no longer working for some reason. + if self.logger.level <= logging.DEBUG: + counter_for_logging = ( + counter_for_logging + 1 + ) % logging_interval + if counter_for_logging == 0: + log_message = ( + f"{logging_interval} WebSocket messages received " + f"after the previous same log ({session_id})" ) + self.logger.debug(log_message) + if message is not None: if message.type == WSMsgType.TEXT: message_data = message.data @@ -242,7 +318,7 @@

Module slack_sdk.socket_mode.aiohttp

elif message.type == WSMsgType.CLOSE: if self.auto_reconnect_enabled: self.logger.info( - "Received CLOSE event. Reconnecting..." + f"Received CLOSE event from {session_id}. Reconnecting..." ) await self.connect_to_new_endpoint() for listener in self.on_close_listeners: @@ -254,7 +330,7 @@

Module slack_sdk.socket_mode.aiohttp

await asyncio.sleep(self.ping_interval) continue elif message.type == WSMsgType.PING: - await self.current_session.pong(message.data) + await session.pong(message.data) continue elif message.type == WSMsgType.PONG: if message.data is not None: @@ -269,31 +345,56 @@

Module slack_sdk.socket_mode.aiohttp

except Exception as e: self.logger.warning( f"Failed to parse the last_ping_pong_time value from {str_message_data}" - f" - error : {e}" + f" - error : {e}, session: {session_id}" ) continue consecutive_error_count = 0 except Exception as e: consecutive_error_count += 1 self.logger.error( - f"Failed to receive or enqueue a message: {type(e).__name__}, {e}" + f"Failed to receive or enqueue a message: {type(e).__name__}, {e} ({session_id})" ) if isinstance(e, ClientConnectionError): await asyncio.sleep(self.ping_interval) else: await asyncio.sleep(consecutive_error_count) except asyncio.CancelledError: - if self.trace_enabled: - self.logger.debug("The running receive_messages task is now cancelled") + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"The running receive_messages task for {session_id} is now cancelled" + ) raise + async def is_ping_pong_failing(self) -> bool: + if self.last_ping_pong_time is None: + return False + disconnected_seconds = int(time.time() - self.last_ping_pong_time) + return disconnected_seconds >= (self.ping_interval * 4) + async def is_connected(self) -> bool: - return ( + connected: bool = ( not self.closed and not self.stale and self.current_session is not None and not self.current_session.closed + and not await self.is_ping_pong_failing() ) + if connected is False and self.logger.level <= logging.DEBUG: + is_ping_pong_failing = await self.is_ping_pong_failing() + session_id = await self.session_id() + self.logger.debug( + "Inactive connection detected (" + f"session_id: {session_id}, " + f"closed: {self.closed}, " + f"stale: {self.stale}, " + f"current_session.closed: {self.current_session.closed}, " + f"is_ping_pong_failing: {is_ping_pong_failing}" + ")" + ) + return connected + + async def session_id(self) -> str: + return self.build_session_id(self.current_session) async def connect(self): old_session = None if self.current_session is None else self.current_session @@ -305,9 +406,18 @@

Module slack_sdk.socket_mode.aiohttp

heartbeat=self.ping_interval, proxy=self.proxy, ) + session_id: str = await self.session_id() self.auto_reconnect_enabled = self.default_auto_reconnect_enabled self.stale = False - self.logger.info("A new session has been established") + self.logger.info(f"A new session ({session_id}) has been established") + + # The first ping from the new connection + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"Sending a ping message with the newly established connection ({session_id})..." + ) + t = time.time() + await self.current_session.ping(f"sdk-ping-pong:{t}") if self.current_session_monitor is not None: self.current_session_monitor.cancel() @@ -315,24 +425,39 @@

Module slack_sdk.socket_mode.aiohttp

self.current_session_monitor = asyncio.ensure_future( self.monitor_current_session() ) + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new monitor_current_session() executor has been recreated for {session_id}" + ) if self.message_receiver is not None: self.message_receiver.cancel() self.message_receiver = asyncio.ensure_future(self.receive_messages()) + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new receive_messages() executor has been recreated for {session_id}" + ) if old_session is not None: await old_session.close() - self.logger.info("The old session has been abandoned") + old_session_id = self.build_session_id(old_session) + self.logger.info(f"The old session ({old_session_id}) has been abandoned") async def disconnect(self): if self.current_session is not None: await self.current_session.close() - self.logger.info("The session has been abandoned") + session_id = await self.session_id() + self.logger.info( + f"The current session ({session_id}) has been abandoned by disconnect() method call" + ) async def send_message(self, message: str): + session_id = await self.session_id() if self.logger.level <= logging.DEBUG: - self.logger.debug(f"Sending a message: {message}") + self.logger.debug( + f"Sending a message: {message} from session: {session_id}" + ) try: await self.current_session.send_str(message) except ConnectionError as e: @@ -340,7 +465,7 @@

Module slack_sdk.socket_mode.aiohttp

# We can do one more try here as the self.current_session should be ready now. if self.logger.level <= logging.DEBUG: self.logger.debug( - f"Failed to send a message (error: {e}, message: {message})" + f"Failed to send a message (error: {e}, message: {message}, session: {session_id})" " as the underlying connection was replaced. Retrying the same request only one time..." ) # Although acquiring self.connect_operation_lock also for the first method call is the safest way, @@ -351,7 +476,8 @@

Module slack_sdk.socket_mode.aiohttp

await self.current_session.send_str(message) else: self.logger.warning( - "The current session is no longer active. Failed to send a message" + f"The current session ({session_id}) is no longer active. " + "Failed to send a message" ) raise e finally: @@ -369,7 +495,13 @@

Module slack_sdk.socket_mode.aiohttp

if self.message_receiver is not None: self.message_receiver.cancel() if self.aiohttp_client_session is not None: - await self.aiohttp_client_session.close() + await self.aiohttp_client_session.close() + + @classmethod + def build_session_id(cls, session: ClientWebSocketResponse) -> str: + if session is None: + return None + return "s_" + str(hash(session))
@@ -524,59 +656,116 @@

Args

self.message_processor = asyncio.ensure_future(self.process_messages()) async def monitor_current_session(self) -> None: + # In the asyncio runtime, accessing a shared object (self.current_session here) from + # multiple tasks can cause race conditions and errors. + # To avoid such, we access only the session that is active when this loop starts. + session: ClientWebSocketResponse = self.current_session + session_id: str = self.build_session_id(session) + + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new monitor_current_session() execution loop for {session_id} started" + ) try: + logging_interval = 100 + counter_for_logging = 0 + while not self.closed: + if session != self.current_session: + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"The monitor_current_session task for {session_id} is now cancelled" + ) + break try: + # The logging here is for detailed trouble shooting of potential issues in this client. + # If you don't see this log for a while, it means that + # this receive_messages execution is no longer working for some reason. + counter_for_logging = (counter_for_logging + 1) % logging_interval + if counter_for_logging == 0: + log_message = ( + f"{logging_interval} session verification executed after the previous same log" + f" ({session_id})" + ) + self.logger.debug(log_message) + await asyncio.sleep(self.ping_interval) - if self.current_session is not None: + + if session is not None and session.closed is False: t = time.time() if self.last_ping_pong_time is None: self.last_ping_pong_time = float(t) - await self.current_session.ping(f"sdk-ping-pong:{t}") + try: + await session.ping(f"sdk-ping-pong:{t}") + except Exception as e: + # The ping() method can fail for some reason. + # To establish a new connection even in this scenario, + # we ignore the exception here. + self.logger.warning( + f"Failed to send a ping message ({session_id}): {e}" + ) if self.auto_reconnect_enabled: should_reconnect = False - if self.current_session is None or self.current_session.closed: + if session is None or session.closed: self.logger.info( - "The session seems to be already closed. Reconnecting..." + f"The session ({session_id}) seems to be already closed. Reconnecting..." ) should_reconnect = True - if self.last_ping_pong_time is not None: + if await self.is_ping_pong_failing(): disconnected_seconds = int( time.time() - self.last_ping_pong_time ) - if disconnected_seconds >= (self.ping_interval * 4): - self.logger.info( - "The connection seems to be stale. Reconnecting..." - f" reason: disconnected for {disconnected_seconds}+ seconds)" - ) - self.stale = True - self.last_ping_pong_time = None - should_reconnect = True + self.logger.info( + f"The session ({session_id}) seems to be stale. Reconnecting..." + f" reason: disconnected for {disconnected_seconds}+ seconds)" + ) + self.stale = True + self.last_ping_pong_time = None + should_reconnect = True if should_reconnect is True or not await self.is_connected(): await self.connect_to_new_endpoint() except Exception as e: self.logger.error( - "Failed to check the current session or reconnect to the server " + f"Failed to check the current session ({session_id}) or reconnect to the server " f"(error: {type(e).__name__}, message: {e})" ) except asyncio.CancelledError: - if self.trace_enabled: + if self.logger.level <= logging.DEBUG: self.logger.debug( - "The running monitor_current_session task is now cancelled" + f"The monitor_current_session task for {session_id} is now cancelled" ) raise async def receive_messages(self) -> None: + # In the asyncio runtime, accessing a shared object (self.current_session here) from + # multiple tasks can cause race conditions and errors. + # To avoid such, we access only the session that is active when this loop starts. + session = self.current_session + session_id = self.build_session_id(session) + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new receive_messages() execution loop with {session_id} started" + ) try: consecutive_error_count = 0 + logging_interval = 100 + counter_for_logging = 0 + while not self.closed: + if session != self.current_session: + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"The running receive_messages task for {session_id} is now cancelled" + ) + break try: - message: WSMessage = await self.current_session.receive() + message: WSMessage = await session.receive() if self.trace_enabled and self.logger.level <= logging.DEBUG: + # The following logging prints every single received message except empty message data ones. type = WSMsgType(message.type) message_type = type.name if type is not None else message.type message_data = message.data @@ -585,8 +774,27 @@

Args

if len(message_data) > 0: # To skip the empty message that Slack server-side often sends self.logger.debug( - f"Received message (type: {message_type}, data: {message_data}, extra: {message.extra})" + f"Received message " + f"(type: {message_type}, " + f"data: {message_data}, " + f"extra: {message.extra}, " + f"session: {session_id})" + ) + + # The logging here is for detailed trouble shooting of potential issues in this client. + # If you don't see this log for a while, it means that + # this receive_messages execution is no longer working for some reason. + if self.logger.level <= logging.DEBUG: + counter_for_logging = ( + counter_for_logging + 1 + ) % logging_interval + if counter_for_logging == 0: + log_message = ( + f"{logging_interval} WebSocket messages received " + f"after the previous same log ({session_id})" ) + self.logger.debug(log_message) + if message is not None: if message.type == WSMsgType.TEXT: message_data = message.data @@ -596,7 +804,7 @@

Args

elif message.type == WSMsgType.CLOSE: if self.auto_reconnect_enabled: self.logger.info( - "Received CLOSE event. Reconnecting..." + f"Received CLOSE event from {session_id}. Reconnecting..." ) await self.connect_to_new_endpoint() for listener in self.on_close_listeners: @@ -608,7 +816,7 @@

Args

await asyncio.sleep(self.ping_interval) continue elif message.type == WSMsgType.PING: - await self.current_session.pong(message.data) + await session.pong(message.data) continue elif message.type == WSMsgType.PONG: if message.data is not None: @@ -623,31 +831,56 @@

Args

except Exception as e: self.logger.warning( f"Failed to parse the last_ping_pong_time value from {str_message_data}" - f" - error : {e}" + f" - error : {e}, session: {session_id}" ) continue consecutive_error_count = 0 except Exception as e: consecutive_error_count += 1 self.logger.error( - f"Failed to receive or enqueue a message: {type(e).__name__}, {e}" + f"Failed to receive or enqueue a message: {type(e).__name__}, {e} ({session_id})" ) if isinstance(e, ClientConnectionError): await asyncio.sleep(self.ping_interval) else: await asyncio.sleep(consecutive_error_count) except asyncio.CancelledError: - if self.trace_enabled: - self.logger.debug("The running receive_messages task is now cancelled") + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"The running receive_messages task for {session_id} is now cancelled" + ) raise + async def is_ping_pong_failing(self) -> bool: + if self.last_ping_pong_time is None: + return False + disconnected_seconds = int(time.time() - self.last_ping_pong_time) + return disconnected_seconds >= (self.ping_interval * 4) + async def is_connected(self) -> bool: - return ( + connected: bool = ( not self.closed and not self.stale and self.current_session is not None and not self.current_session.closed + and not await self.is_ping_pong_failing() ) + if connected is False and self.logger.level <= logging.DEBUG: + is_ping_pong_failing = await self.is_ping_pong_failing() + session_id = await self.session_id() + self.logger.debug( + "Inactive connection detected (" + f"session_id: {session_id}, " + f"closed: {self.closed}, " + f"stale: {self.stale}, " + f"current_session.closed: {self.current_session.closed}, " + f"is_ping_pong_failing: {is_ping_pong_failing}" + ")" + ) + return connected + + async def session_id(self) -> str: + return self.build_session_id(self.current_session) async def connect(self): old_session = None if self.current_session is None else self.current_session @@ -659,9 +892,18 @@

Args

heartbeat=self.ping_interval, proxy=self.proxy, ) + session_id: str = await self.session_id() self.auto_reconnect_enabled = self.default_auto_reconnect_enabled self.stale = False - self.logger.info("A new session has been established") + self.logger.info(f"A new session ({session_id}) has been established") + + # The first ping from the new connection + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"Sending a ping message with the newly established connection ({session_id})..." + ) + t = time.time() + await self.current_session.ping(f"sdk-ping-pong:{t}") if self.current_session_monitor is not None: self.current_session_monitor.cancel() @@ -669,24 +911,39 @@

Args

self.current_session_monitor = asyncio.ensure_future( self.monitor_current_session() ) + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new monitor_current_session() executor has been recreated for {session_id}" + ) if self.message_receiver is not None: self.message_receiver.cancel() self.message_receiver = asyncio.ensure_future(self.receive_messages()) + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new receive_messages() executor has been recreated for {session_id}" + ) if old_session is not None: await old_session.close() - self.logger.info("The old session has been abandoned") + old_session_id = self.build_session_id(old_session) + self.logger.info(f"The old session ({old_session_id}) has been abandoned") async def disconnect(self): if self.current_session is not None: await self.current_session.close() - self.logger.info("The session has been abandoned") + session_id = await self.session_id() + self.logger.info( + f"The current session ({session_id}) has been abandoned by disconnect() method call" + ) async def send_message(self, message: str): + session_id = await self.session_id() if self.logger.level <= logging.DEBUG: - self.logger.debug(f"Sending a message: {message}") + self.logger.debug( + f"Sending a message: {message} from session: {session_id}" + ) try: await self.current_session.send_str(message) except ConnectionError as e: @@ -694,7 +951,7 @@

Args

# We can do one more try here as the self.current_session should be ready now. if self.logger.level <= logging.DEBUG: self.logger.debug( - f"Failed to send a message (error: {e}, message: {message})" + f"Failed to send a message (error: {e}, message: {message}, session: {session_id})" " as the underlying connection was replaced. Retrying the same request only one time..." ) # Although acquiring self.connect_operation_lock also for the first method call is the safest way, @@ -705,7 +962,8 @@

Args

await self.current_session.send_str(message) else: self.logger.warning( - "The current session is no longer active. Failed to send a message" + f"The current session ({session_id}) is no longer active. " + "Failed to send a message" ) raise e finally: @@ -723,7 +981,13 @@

Args

if self.message_receiver is not None: self.message_receiver.cancel() if self.aiohttp_client_session is not None: - await self.aiohttp_client_session.close() + await self.aiohttp_client_session.close() + + @classmethod + def build_session_id(cls, session: ClientWebSocketResponse) -> str: + if session is None: + return None + return "s_" + str(hash(session))

Ancestors

@@ -263,6 +281,9 @@

Classes

async def is_connected(self) -> bool: return False + async def session_id(self) -> str: + return "" + async def connect(self): raise NotImplementedError() @@ -270,11 +291,12 @@

Classes

raise NotImplementedError() async def connect_to_new_endpoint(self, force: bool = False): + session_id = await self.session_id() try: await self.connect_operation_lock.acquire() if self.trace_enabled: self.logger.debug( - "For reconnection, the connect_operation_lock was acquired" + f"For reconnection, the connect_operation_lock was acquired (session: {session_id})" ) if force or not await self.is_connected(): self.wss_uri = await self.issue_new_wss_url() @@ -284,7 +306,7 @@

Classes

self.connect_operation_lock.release() if self.trace_enabled: self.logger.debug( - "The connect_operation_lock for reconnection was released" + f"The connect_operation_lock for reconnection was released (session: {session_id})" ) async def close(self): @@ -306,20 +328,26 @@

Classes

await self.message_queue.put(message) if self.logger.level <= logging.DEBUG: queue_size = self.message_queue.qsize() + session_id = await self.session_id() self.logger.debug( - f"A new message enqueued (current queue size: {queue_size})" + f"A new message enqueued (current queue size: {queue_size}, session: {session_id})" ) async def process_messages(self): + session_id = await self.session_id() try: while not self.closed: try: await self.process_message() except Exception as e: - self.logger.exception(f"Failed to process a message: {e}") + self.logger.exception( + f"Failed to process a message: {e}, session: {session_id}" + ) except asyncio.CancelledError: if self.trace_enabled: - self.logger.debug("The running process_messages task is now cancelled") + self.logger.debug( + f"The running process_messages task for {session_id} is now cancelled" + ) raise async def process_message(self): @@ -333,10 +361,11 @@

Classes

) async def run_message_listeners(self, message: dict, raw_message: str) -> None: + session_id = await self.session_id() type, envelope_id = message.get("type"), message.get("envelope_id") if self.logger.level <= logging.DEBUG: self.logger.debug( - f"Message processing started (type: {type}, envelope_id: {envelope_id})" + f"Message processing started (type: {type}, envelope_id: {envelope_id}, session: {session_id})" ) try: if message.get("type") == "disconnect": @@ -347,7 +376,9 @@

Classes

try: await listener(self, message, raw_message) except Exception as e: - self.logger.exception(f"Failed to run a message listener: {e}") + self.logger.exception( + f"Failed to run a message listener: {e}, session: {session_id}" + ) if len(self.socket_mode_request_listeners) > 0: request = SocketModeRequest.from_dict(message) @@ -357,14 +388,19 @@

Classes

await listener(self, request) except Exception as e: self.logger.exception( - f"Failed to run a request listener: {e}" + f"Failed to run a request listener: {e}, session: {session_id}" ) except Exception as e: - self.logger.exception(f"Failed to run message listeners: {e}") + self.logger.exception( + f"Failed to run message listeners: {e}, session: {session_id}" + ) finally: if self.logger.level <= logging.DEBUG: self.logger.debug( - f"Message processing completed (type: {type}, envelope_id: {envelope_id})" + f"Message processing completed (" + f"type: {type}, " + f"envelope_id: {envelope_id}, " + f"session: {session_id})" )

Subclasses

@@ -458,11 +494,12 @@

Methods

Expand source code
async def connect_to_new_endpoint(self, force: bool = False):
+    session_id = await self.session_id()
     try:
         await self.connect_operation_lock.acquire()
         if self.trace_enabled:
             self.logger.debug(
-                "For reconnection, the connect_operation_lock was acquired"
+                f"For reconnection, the connect_operation_lock was acquired (session: {session_id})"
             )
         if force or not await self.is_connected():
             self.wss_uri = await self.issue_new_wss_url()
@@ -472,7 +509,7 @@ 

Methods

self.connect_operation_lock.release() if self.trace_enabled: self.logger.debug( - "The connect_operation_lock for reconnection was released" + f"The connect_operation_lock for reconnection was released (session: {session_id})" )
@@ -502,8 +539,9 @@

Methods

await self.message_queue.put(message) if self.logger.level <= logging.DEBUG: queue_size = self.message_queue.qsize() + session_id = await self.session_id() self.logger.debug( - f"A new message enqueued (current queue size: {queue_size})" + f"A new message enqueued (current queue size: {queue_size}, session: {session_id})" ) @@ -579,15 +617,20 @@

Methods

Expand source code
async def process_messages(self):
+    session_id = await self.session_id()
     try:
         while not self.closed:
             try:
                 await self.process_message()
             except Exception as e:
-                self.logger.exception(f"Failed to process a message: {e}")
+                self.logger.exception(
+                    f"Failed to process a message: {e}, session: {session_id}"
+                )
     except asyncio.CancelledError:
         if self.trace_enabled:
-            self.logger.debug("The running process_messages task is now cancelled")
+            self.logger.debug(
+                f"The running process_messages task for {session_id} is now cancelled"
+            )
         raise
@@ -601,10 +644,11 @@

Methods

Expand source code
async def run_message_listeners(self, message: dict, raw_message: str) -> None:
+    session_id = await self.session_id()
     type, envelope_id = message.get("type"), message.get("envelope_id")
     if self.logger.level <= logging.DEBUG:
         self.logger.debug(
-            f"Message processing started (type: {type}, envelope_id: {envelope_id})"
+            f"Message processing started (type: {type}, envelope_id: {envelope_id}, session: {session_id})"
         )
     try:
         if message.get("type") == "disconnect":
@@ -615,7 +659,9 @@ 

Methods

try: await listener(self, message, raw_message) except Exception as e: - self.logger.exception(f"Failed to run a message listener: {e}") + self.logger.exception( + f"Failed to run a message listener: {e}, session: {session_id}" + ) if len(self.socket_mode_request_listeners) > 0: request = SocketModeRequest.from_dict(message) @@ -625,14 +671,19 @@

Methods

await listener(self, request) except Exception as e: self.logger.exception( - f"Failed to run a request listener: {e}" + f"Failed to run a request listener: {e}, session: {session_id}" ) except Exception as e: - self.logger.exception(f"Failed to run message listeners: {e}") + self.logger.exception( + f"Failed to run message listeners: {e}, session: {session_id}" + ) finally: if self.logger.level <= logging.DEBUG: self.logger.debug( - f"Message processing completed (type: {type}, envelope_id: {envelope_id})" + f"Message processing completed (" + f"type: {type}, " + f"envelope_id: {envelope_id}, " + f"session: {session_id})" )
@@ -667,6 +718,19 @@

Methods

await self.send_message(json.dumps(response)) +
+async def session_id(self) ‑> str +
+
+
+
+ +Expand source code + +
async def session_id(self) -> str:
+    return ""
+
+
@@ -707,6 +771,7 @@

run_message_listeners
  • send_message
  • send_socket_mode_response
  • +
  • session_id
  • socket_mode_request_listeners
  • trace_enabled
  • web_client
  • diff --git a/docs/api-docs/slack_sdk/socket_mode/websockets/index.html b/docs/api-docs/slack_sdk/socket_mode/websockets/index.html index ce2813e26..514647f38 100644 --- a/docs/api-docs/slack_sdk/socket_mode/websockets/index.html +++ b/docs/api-docs/slack_sdk/socket_mode/websockets/index.html @@ -47,8 +47,8 @@

    Module slack_sdk.socket_mode.websockets

    from typing import Union, Optional, List, Callable, Awaitable import websockets -from websockets.client import WebSocketClientProtocol from websockets.exceptions import WebSocketException +from websockets.legacy.client import WebSocketClientProtocol from slack_sdk.socket_mode.async_client import AsyncBaseSocketModeClient from slack_sdk.socket_mode.async_listeners import ( @@ -134,43 +134,89 @@

    Module slack_sdk.socket_mode.websockets

    self.message_processor = asyncio.ensure_future(self.process_messages()) async def monitor_current_session(self) -> None: - while not self.closed: - await asyncio.sleep(self.ping_interval) - try: - if self.auto_reconnect_enabled and ( - self.current_session is None or self.current_session.closed - ): - self.logger.info( - "The session seems to be already closed. Reconnecting..." + # In the asyncio runtime, accessing a shared object (self.current_session here) from + # multiple tasks can cause race conditions and errors. + # To avoid such, we access only the session that is active when this loop starts. + session: WebSocketClientProtocol = self.current_session + session_id: str = await self.session_id() + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new monitor_current_session() execution loop for {session_id} started" + ) + try: + while not self.closed: + if session != self.current_session: + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"The monitor_current_session task for {session_id} is now cancelled" + ) + break + await asyncio.sleep(self.ping_interval) + try: + if self.auto_reconnect_enabled and ( + session is None or session.closed + ): + self.logger.info( + f"The session ({session_id}) seems to be already closed. Reconnecting..." + ) + await self.connect_to_new_endpoint() + except Exception as e: + self.logger.error( + "Failed to check the current session or reconnect to the server " + f"(error: {type(e).__name__}, message: {e}, session: {session_id})" ) - await self.connect_to_new_endpoint() - except Exception as e: - self.logger.error( - "Failed to check the current session or reconnect to the server " - f"(error: {type(e).__name__}, message: {e})" + except asyncio.CancelledError: + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"The monitor_current_session task for {session_id} is now cancelled" ) + raise async def receive_messages(self) -> None: + # In the asyncio runtime, accessing a shared object (self.current_session here) from + # multiple tasks can cause race conditions and errors. + # To avoid such, we access only the session that is active when this loop starts. + session: WebSocketClientProtocol = self.current_session + session_id: str = await self.session_id() consecutive_error_count = 0 - while not self.closed: - try: - message = await self.current_session.recv() - if message is not None: - if isinstance(message, bytes): - message = message.decode("utf-8") + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new receive_messages() execution loop with {session_id} started" + ) + try: + while not self.closed: + if session != self.current_session: if self.logger.level <= logging.DEBUG: - self.logger.debug(f"Received message: {message}") - await self.enqueue_message(message) - consecutive_error_count = 0 - except Exception as e: - consecutive_error_count += 1 - self.logger.error( - f"Failed to receive or enqueue a message: {type(e).__name__}, {e}" + self.logger.debug( + f"The running receive_messages task for {session_id} is now cancelled" + ) + break + try: + message = await session.recv() + if message is not None: + if isinstance(message, bytes): + message = message.decode("utf-8") + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"Received message: {message}, session: {session_id}" + ) + await self.enqueue_message(message) + consecutive_error_count = 0 + except Exception as e: + consecutive_error_count += 1 + self.logger.error( + f"Failed to receive or enqueue a message: {type(e).__name__}, error: {e}, session: {session_id}" + ) + if isinstance(e, websockets.ConnectionClosedError): + await asyncio.sleep(self.ping_interval) + else: + await asyncio.sleep(consecutive_error_count) + except asyncio.CancelledError: + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"The running receive_messages task for {session_id} is now cancelled" ) - if isinstance(e, websockets.ConnectionClosedError): - await asyncio.sleep(self.ping_interval) - else: - await asyncio.sleep(consecutive_error_count) + raise async def is_connected(self) -> bool: return ( @@ -179,6 +225,9 @@

    Module slack_sdk.socket_mode.websockets

    and not self.current_session.closed ) + async def session_id(self) -> str: + return self.build_session_id(self.current_session) + async def connect(self): if self.wss_uri is None: self.wss_uri = await self.issue_new_wss_url() @@ -190,36 +239,52 @@

    Module slack_sdk.socket_mode.websockets

    uri=self.wss_uri, ping_interval=self.ping_interval, ) + session_id = await self.session_id() self.auto_reconnect_enabled = self.default_auto_reconnect_enabled - self.logger.info("A new session has been established") + self.logger.info(f"A new session ({session_id}) has been established") - if self.current_session_monitor is None: - self.current_session_monitor = asyncio.ensure_future( - self.monitor_current_session() + if self.current_session_monitor is not None: + self.current_session_monitor.cancel() + self.current_session_monitor = asyncio.ensure_future( + self.monitor_current_session() + ) + + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new monitor_current_session() executor has been recreated for {session_id}" ) - if self.message_receiver is None: - self.message_receiver = asyncio.ensure_future(self.receive_messages()) + if self.message_receiver is not None: + self.message_receiver.cancel() + self.message_receiver = asyncio.ensure_future(self.receive_messages()) + + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new receive_messages() executor has been recreated for {session_id}" + ) if old_session is not None: await old_session.close() - self.logger.info("The old session has been abandoned") + old_session_id = self.build_session_id(old_session) + self.logger.info(f"The old session ({old_session_id}) has been abandoned") async def disconnect(self): if self.current_session is not None: await self.current_session.close() async def send_message(self, message: str): + session = self.current_session + session_id = self.build_session_id(session) if self.logger.level <= logging.DEBUG: - self.logger.debug(f"Sending a message: {message}") + self.logger.debug(f"Sending a message: {message}, session: {session_id}") try: - await self.current_session.send(message) + await session.send(message) except WebSocketException as e: # We rarely get this exception while replacing the underlying WebSocket connections. # We can do one more try here as the self.current_session should be ready now. if self.logger.level <= logging.DEBUG: self.logger.debug( - f"Failed to send a message (error: {e}, message: {message})" + f"Failed to send a message (error: {e}, message: {message}, session: {session_id})" " as the underlying connection was replaced. Retrying the same request only one time..." ) # Although acquiring self.connect_operation_lock also for the first method call is the safest way, @@ -229,7 +294,7 @@

    Module slack_sdk.socket_mode.websockets

    await self.current_session.send(message) else: self.logger.warning( - "The current session is no longer active. Failed to send a message" + f"The current session ({session_id}) is no longer active. Failed to send a message" ) raise e finally: @@ -244,7 +309,13 @@

    Module slack_sdk.socket_mode.websockets

    if self.current_session_monitor is not None: self.current_session_monitor.cancel() if self.message_receiver is not None: - self.message_receiver.cancel() + self.message_receiver.cancel() + + @classmethod + def build_session_id(cls, session: WebSocketClientProtocol) -> str: + if session is None: + return None + return "s_" + str(hash(session))
    @@ -356,43 +427,89 @@

    Args

    self.message_processor = asyncio.ensure_future(self.process_messages()) async def monitor_current_session(self) -> None: - while not self.closed: - await asyncio.sleep(self.ping_interval) - try: - if self.auto_reconnect_enabled and ( - self.current_session is None or self.current_session.closed - ): - self.logger.info( - "The session seems to be already closed. Reconnecting..." + # In the asyncio runtime, accessing a shared object (self.current_session here) from + # multiple tasks can cause race conditions and errors. + # To avoid such, we access only the session that is active when this loop starts. + session: WebSocketClientProtocol = self.current_session + session_id: str = await self.session_id() + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new monitor_current_session() execution loop for {session_id} started" + ) + try: + while not self.closed: + if session != self.current_session: + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"The monitor_current_session task for {session_id} is now cancelled" + ) + break + await asyncio.sleep(self.ping_interval) + try: + if self.auto_reconnect_enabled and ( + session is None or session.closed + ): + self.logger.info( + f"The session ({session_id}) seems to be already closed. Reconnecting..." + ) + await self.connect_to_new_endpoint() + except Exception as e: + self.logger.error( + "Failed to check the current session or reconnect to the server " + f"(error: {type(e).__name__}, message: {e}, session: {session_id})" ) - await self.connect_to_new_endpoint() - except Exception as e: - self.logger.error( - "Failed to check the current session or reconnect to the server " - f"(error: {type(e).__name__}, message: {e})" + except asyncio.CancelledError: + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"The monitor_current_session task for {session_id} is now cancelled" ) + raise async def receive_messages(self) -> None: + # In the asyncio runtime, accessing a shared object (self.current_session here) from + # multiple tasks can cause race conditions and errors. + # To avoid such, we access only the session that is active when this loop starts. + session: WebSocketClientProtocol = self.current_session + session_id: str = await self.session_id() consecutive_error_count = 0 - while not self.closed: - try: - message = await self.current_session.recv() - if message is not None: - if isinstance(message, bytes): - message = message.decode("utf-8") + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new receive_messages() execution loop with {session_id} started" + ) + try: + while not self.closed: + if session != self.current_session: if self.logger.level <= logging.DEBUG: - self.logger.debug(f"Received message: {message}") - await self.enqueue_message(message) - consecutive_error_count = 0 - except Exception as e: - consecutive_error_count += 1 - self.logger.error( - f"Failed to receive or enqueue a message: {type(e).__name__}, {e}" + self.logger.debug( + f"The running receive_messages task for {session_id} is now cancelled" + ) + break + try: + message = await session.recv() + if message is not None: + if isinstance(message, bytes): + message = message.decode("utf-8") + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"Received message: {message}, session: {session_id}" + ) + await self.enqueue_message(message) + consecutive_error_count = 0 + except Exception as e: + consecutive_error_count += 1 + self.logger.error( + f"Failed to receive or enqueue a message: {type(e).__name__}, error: {e}, session: {session_id}" + ) + if isinstance(e, websockets.ConnectionClosedError): + await asyncio.sleep(self.ping_interval) + else: + await asyncio.sleep(consecutive_error_count) + except asyncio.CancelledError: + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"The running receive_messages task for {session_id} is now cancelled" ) - if isinstance(e, websockets.ConnectionClosedError): - await asyncio.sleep(self.ping_interval) - else: - await asyncio.sleep(consecutive_error_count) + raise async def is_connected(self) -> bool: return ( @@ -401,6 +518,9 @@

    Args

    and not self.current_session.closed ) + async def session_id(self) -> str: + return self.build_session_id(self.current_session) + async def connect(self): if self.wss_uri is None: self.wss_uri = await self.issue_new_wss_url() @@ -412,36 +532,52 @@

    Args

    uri=self.wss_uri, ping_interval=self.ping_interval, ) + session_id = await self.session_id() self.auto_reconnect_enabled = self.default_auto_reconnect_enabled - self.logger.info("A new session has been established") + self.logger.info(f"A new session ({session_id}) has been established") + + if self.current_session_monitor is not None: + self.current_session_monitor.cancel() + self.current_session_monitor = asyncio.ensure_future( + self.monitor_current_session() + ) - if self.current_session_monitor is None: - self.current_session_monitor = asyncio.ensure_future( - self.monitor_current_session() + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new monitor_current_session() executor has been recreated for {session_id}" ) - if self.message_receiver is None: - self.message_receiver = asyncio.ensure_future(self.receive_messages()) + if self.message_receiver is not None: + self.message_receiver.cancel() + self.message_receiver = asyncio.ensure_future(self.receive_messages()) + + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"A new receive_messages() executor has been recreated for {session_id}" + ) if old_session is not None: await old_session.close() - self.logger.info("The old session has been abandoned") + old_session_id = self.build_session_id(old_session) + self.logger.info(f"The old session ({old_session_id}) has been abandoned") async def disconnect(self): if self.current_session is not None: await self.current_session.close() async def send_message(self, message: str): + session = self.current_session + session_id = self.build_session_id(session) if self.logger.level <= logging.DEBUG: - self.logger.debug(f"Sending a message: {message}") + self.logger.debug(f"Sending a message: {message}, session: {session_id}") try: - await self.current_session.send(message) + await session.send(message) except WebSocketException as e: # We rarely get this exception while replacing the underlying WebSocket connections. # We can do one more try here as the self.current_session should be ready now. if self.logger.level <= logging.DEBUG: self.logger.debug( - f"Failed to send a message (error: {e}, message: {message})" + f"Failed to send a message (error: {e}, message: {message}, session: {session_id})" " as the underlying connection was replaced. Retrying the same request only one time..." ) # Although acquiring self.connect_operation_lock also for the first method call is the safest way, @@ -451,7 +587,7 @@

    Args

    await self.current_session.send(message) else: self.logger.warning( - "The current session is no longer active. Failed to send a message" + f"The current session ({session_id}) is no longer active. Failed to send a message" ) raise e finally: @@ -466,7 +602,13 @@

    Args

    if self.current_session_monitor is not None: self.current_session_monitor.cancel() if self.message_receiver is not None: - self.message_receiver.cancel() + self.message_receiver.cancel() + + @classmethod + def build_session_id(cls, session: WebSocketClientProtocol) -> str: + if session is None: + return None + return "s_" + str(hash(session))

    Ancestors

    diff --git a/slack_sdk/version.py b/slack_sdk/version.py index 9e0a51126..7fcee6d98 100644 --- a/slack_sdk/version.py +++ b/slack_sdk/version.py @@ -1,2 +1,2 @@ """Check the latest version at https://pypi.org/project/slack-sdk/""" -__version__ = "3.11.0" +__version__ = "3.11.1"