From a80c248c34a77e106551bf997e726457b03f42b5 Mon Sep 17 00:00:00 2001 From: stephan-hof Date: Tue, 16 Aug 2022 17:57:17 +0200 Subject: [PATCH] Don't close ConnectionPools from PoolManager until unreferenced Co-authored-by: Seth Michael Larson --- changelog/1252.bugfix.rst | 2 ++ src/urllib3/connectionpool.py | 32 +++++++++++++++----- src/urllib3/poolmanager.py | 5 +-- test/test_poolmanager.py | 57 +++++++++++++++++++---------------- 4 files changed, 58 insertions(+), 38 deletions(-) create mode 100644 changelog/1252.bugfix.rst diff --git a/changelog/1252.bugfix.rst b/changelog/1252.bugfix.rst new file mode 100644 index 0000000000..c30f5f12a5 --- /dev/null +++ b/changelog/1252.bugfix.rst @@ -0,0 +1,2 @@ +Fixed thread-safety issue where accessing a `PoolManager` with many distinct origins +would cause connection pools to be closed while requests are in progress. diff --git a/src/urllib3/connectionpool.py b/src/urllib3/connectionpool.py index 798e285e03..8c39c179bf 100644 --- a/src/urllib3/connectionpool.py +++ b/src/urllib3/connectionpool.py @@ -3,6 +3,7 @@ import queue import sys import warnings +import weakref from http.client import HTTPResponse as _HttplibHTTPResponse from socket import timeout as SocketTimeout from types import TracebackType @@ -220,6 +221,16 @@ def __init__( self.conn_kw["proxy"] = self.proxy self.conn_kw["proxy_config"] = self.proxy_config + # Do not pass 'self' as callback to 'finalize'. + # Then the 'finalize' would keep an endless living (leak) to self. + # By just passing a reference to the pool allows the garbage collector + # to free self if nobody else has a reference to it. + pool = self.pool + + # Close all the HTTPConnections in the pool before the + # HTTPConnectionPool object is garbage collected. + weakref.finalize(self, _close_pool_connections, pool) + def _new_conn(self) -> HTTPConnection: """ Return a fresh :class:`HTTPConnection`. @@ -503,14 +514,8 @@ def close(self) -> None: # Disable access to the pool old_pool, self.pool = self.pool, None - try: - while True: - conn = old_pool.get(block=False) - if conn: - conn.close() - - except queue.Empty: - pass # Done. + # Close all the HTTPConnections in the pool. + _close_pool_connections(old_pool) def is_same_host(self, url: str) -> bool: """ @@ -1119,3 +1124,14 @@ def _normalize_host(host: Optional[str], scheme: Optional[str]) -> Optional[str] if host and host.startswith("[") and host.endswith("]"): host = host[1:-1] return host + + +def _close_pool_connections(pool: "queue.LifoQueue[Any]") -> None: + """Drains a queue of connections and closes each one.""" + try: + while True: + conn = pool.get(block=False) + if conn: + conn.close() + except queue.Empty: + pass # Done. diff --git a/src/urllib3/poolmanager.py b/src/urllib3/poolmanager.py index 82e205e435..532bc3cec3 100644 --- a/src/urllib3/poolmanager.py +++ b/src/urllib3/poolmanager.py @@ -215,11 +215,8 @@ def __init__( super().__init__(headers) self.connection_pool_kw = connection_pool_kw - def dispose_func(p: Any) -> None: - p.close() - self.pools: RecentlyUsedContainer[PoolKey, HTTPConnectionPool] - self.pools = RecentlyUsedContainer(num_pools, dispose_func=dispose_func) + self.pools = RecentlyUsedContainer(num_pools) # Locally set the pool classes and keys so other PoolManagers can # override them. diff --git a/test/test_poolmanager.py b/test/test_poolmanager.py index 743d5dfd75..0a75063b7c 100644 --- a/test/test_poolmanager.py +++ b/test/test_poolmanager.py @@ -1,3 +1,4 @@ +import gc import socket from test import resolvesLocalhostFQDN from typing import Optional @@ -8,7 +9,7 @@ from urllib3 import connection_from_url from urllib3.connectionpool import HTTPSConnectionPool -from urllib3.exceptions import ClosedPoolError, LocationValueError +from urllib3.exceptions import LocationValueError from urllib3.poolmanager import ( _DEFAULT_BLOCKSIZE, PoolKey, @@ -69,24 +70,12 @@ def test_many_urls(self) -> None: def test_manager_clear(self) -> None: p = PoolManager(5) - conn_pool = p.connection_from_url("http://google.com") + p.connection_from_url("http://google.com") assert len(p.pools) == 1 - conn = conn_pool._get_conn() - p.clear() assert len(p.pools) == 0 - with pytest.raises(ClosedPoolError): - conn_pool._get_conn() - - conn_pool._put_conn(conn) - - with pytest.raises(ClosedPoolError): - conn_pool._get_conn() - - assert len(p.pools) == 0 - @pytest.mark.parametrize("url", ["http://@", None]) def test_nohost(self, url: Optional[str]) -> None: p = PoolManager(5) @@ -95,19 +84,8 @@ def test_nohost(self, url: Optional[str]) -> None: def test_contextmanager(self) -> None: with PoolManager(1) as p: - conn_pool = p.connection_from_url("http://google.com") + p.connection_from_url("http://google.com") assert len(p.pools) == 1 - conn = conn_pool._get_conn() - - assert len(p.pools) == 0 - - with pytest.raises(ClosedPoolError): - conn_pool._get_conn() - - conn_pool._put_conn(conn) - - with pytest.raises(ClosedPoolError): - conn_pool._get_conn() assert len(p.pools) == 0 @@ -468,3 +446,30 @@ def test_e2e_connect_to_ipv6_scoped_tls( conn.connect() assert ssl_wrap_socket.call_args[1]["server_hostname"] == "a::b" + + def test_thread_safty(self) -> None: + pool_manager = PoolManager(num_pools=2) + + # thread 1 gets a pool for host x + pool_1 = pool_manager.connection_from_url("http://host_x:80/") + + # thread 2 gets a pool for host y + pool_2 = pool_manager.connection_from_url("http://host_y:80/") + + # thread 3 gets a pool for host z + pool_3 = pool_manager.connection_from_url("http://host_z:80") + + # None of the pools should be closed, since all of them are referenced. + assert pool_1.pool is not None + assert pool_2.pool is not None + assert pool_3.pool is not None + + conn_queue = pool_1.pool + assert conn_queue.qsize() > 0 + + # thread 1 stops. + del pool_1 + gc.collect() + + # Connection should be closed, because reference to pool_1 is gone. + assert conn_queue.qsize() == 0