Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Remove unneeded loop arguments #1006

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES/1002.removal
@@ -0,0 +1,2 @@
Remove explicit loop arguments from connection classes.
Add loop argument to PubSub.run_in_thread.
14 changes: 9 additions & 5 deletions aioredis/client.py
Expand Up @@ -4133,7 +4133,10 @@ def handle_message(self, response, ignore_subscribe_messages=False):
return message

def run_in_thread(
self, daemon: bool = False, exception_handler: Callable = None
self,
daemon: bool = False,
exception_handler: Callable = None,
loop: asyncio.AbstractEventLoop = None,
) -> "PubSubWorkerThread":
for channel, handler in self.channels.items():
if handler is None:
Expand All @@ -4143,7 +4146,7 @@ def run_in_thread(
raise PubSubError(f"Pattern: '{pattern}' has no handler registered")

thread = PubSubWorkerThread(
self, daemon=daemon, exception_handler=exception_handler
self, daemon=daemon, exception_handler=exception_handler, loop=loop
)
thread.start()
return thread
Expand Down Expand Up @@ -4171,6 +4174,7 @@ def __init__(
daemon: bool = False,
poll_timeout: float = 1.0,
exception_handler: PSWorkerThreadExcHandlerT = None,
loop: asyncio.AbstractEventLoop = None,
):
super().__init__()
self.daemon = daemon
Expand All @@ -4179,9 +4183,9 @@ def __init__(
self.exception_handler = exception_handler
self._running = threading.Event()
# Make sure we have the current thread loop before we
# fork into the new thread. If not loop has been set on the connection
# pool use the current default event loop.
self.loop = pubsub.connection_pool.loop or asyncio.get_event_loop()
# fork into the new thread. If no loop has been specified
# use the current default event loop.
self.loop = loop or asyncio.get_event_loop()

async def _run(self):
pubsub = self.pubsub
Expand Down
10 changes: 1 addition & 9 deletions aioredis/connection.py
Expand Up @@ -560,7 +560,6 @@ class Connection:
"_parser",
"_connect_callbacks",
"_buffer_cutoff",
"_loop",
"__dict__",
)

Expand All @@ -586,7 +585,6 @@ def __init__(
client_name: str = None,
username: str = None,
encoder_class: Type[Encoder] = Encoder,
loop: asyncio.AbstractEventLoop = None,
):
self.pid = os.getpid()
self.host = host
Expand All @@ -612,7 +610,6 @@ def __init__(
)
self._connect_callbacks: List[ConnectCallbackT] = []
self._buffer_cutoff = 6000
self._loop = loop

def __repr__(self):
repr_args = ",".join((f"{k}={v}" for k, v in self.repr_pieces()))
Expand All @@ -627,7 +624,7 @@ def repr_pieces(self):
def __del__(self):
try:
if self.is_connected:
loop = self._loop or asyncio.get_event_loop()
loop = asyncio.get_event_loop()
coro = self.disconnect()
if loop.is_running():
loop.create_task(coro)
Expand Down Expand Up @@ -682,7 +679,6 @@ async def _connect(self):
host=self.host,
port=self.port,
ssl=self.ssl_context.get() if self.ssl_context else None,
loop=self._loop,
)
self._reader = reader
self._writer = writer
Expand Down Expand Up @@ -817,7 +813,6 @@ async def send_packed_command(
await asyncio.wait_for(
self._send_packed_command(command),
self.socket_timeout,
loop=self._loop or asyncio.get_event_loop(),
)
except asyncio.TimeoutError:
await self.disconnect()
Expand Down Expand Up @@ -1051,7 +1046,6 @@ def __init__(
socket_read_size: int = 65536,
health_check_interval: float = 0.0,
client_name=None,
loop: asyncio.AbstractEventLoop = None,
):
self.pid = os.getpid()
self.path = path
Expand All @@ -1070,7 +1064,6 @@ def __init__(
self._parser = parser_class(socket_read_size=socket_read_size)
self._connect_callbacks = []
self._buffer_cutoff = 6000
self._loop = loop

def repr_pieces(self) -> Iterable[Tuple[str, Union[str, int]]]:
pieces = [
Expand Down Expand Up @@ -1273,7 +1266,6 @@ def __init__(
self._available_connections: List[Connection]
self._in_use_connections: Set[Connection]
self.reset() # lgtm [py/init-calls-subclass]
self.loop = self.connection_kwargs.get("loop")
self.encoder_class = self.connection_kwargs.get("encoder_class", Encoder)

def __repr__(self):
Expand Down