Skip to content

Commit

Permalink
Don't close ConnectionPools from PoolManager until unreferenced
Browse files Browse the repository at this point in the history
Co-authored-by: Seth Michael Larson <sethmichaellarson@gmail.com>
  • Loading branch information
stephan-hof and sethmlarson committed Aug 16, 2022
1 parent dd18896 commit a80c248
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 38 deletions.
2 changes: 2 additions & 0 deletions changelog/1252.bugfix.rst
@@ -0,0 +1,2 @@
Fixed thread-safety issue where accessing a `PoolManager` with many distinct origins
would cause connection pools to be closed while requests are in progress.
32 changes: 24 additions & 8 deletions src/urllib3/connectionpool.py
Expand Up @@ -3,6 +3,7 @@
import queue
import sys
import warnings
import weakref
from http.client import HTTPResponse as _HttplibHTTPResponse
from socket import timeout as SocketTimeout
from types import TracebackType
Expand Down Expand Up @@ -220,6 +221,16 @@ def __init__(
self.conn_kw["proxy"] = self.proxy
self.conn_kw["proxy_config"] = self.proxy_config

# Do not pass 'self' as callback to 'finalize'.
# Then the 'finalize' would keep an endless living (leak) to self.
# By just passing a reference to the pool allows the garbage collector
# to free self if nobody else has a reference to it.
pool = self.pool

# Close all the HTTPConnections in the pool before the
# HTTPConnectionPool object is garbage collected.
weakref.finalize(self, _close_pool_connections, pool)

def _new_conn(self) -> HTTPConnection:
"""
Return a fresh :class:`HTTPConnection`.
Expand Down Expand Up @@ -503,14 +514,8 @@ def close(self) -> None:
# Disable access to the pool
old_pool, self.pool = self.pool, None

try:
while True:
conn = old_pool.get(block=False)
if conn:
conn.close()

except queue.Empty:
pass # Done.
# Close all the HTTPConnections in the pool.
_close_pool_connections(old_pool)

def is_same_host(self, url: str) -> bool:
"""
Expand Down Expand Up @@ -1119,3 +1124,14 @@ def _normalize_host(host: Optional[str], scheme: Optional[str]) -> Optional[str]
if host and host.startswith("[") and host.endswith("]"):
host = host[1:-1]
return host


def _close_pool_connections(pool: "queue.LifoQueue[Any]") -> None:
"""Drains a queue of connections and closes each one."""
try:
while True:
conn = pool.get(block=False)
if conn:
conn.close()
except queue.Empty:
pass # Done.
5 changes: 1 addition & 4 deletions src/urllib3/poolmanager.py
Expand Up @@ -215,11 +215,8 @@ def __init__(
super().__init__(headers)
self.connection_pool_kw = connection_pool_kw

def dispose_func(p: Any) -> None:
p.close()

self.pools: RecentlyUsedContainer[PoolKey, HTTPConnectionPool]
self.pools = RecentlyUsedContainer(num_pools, dispose_func=dispose_func)
self.pools = RecentlyUsedContainer(num_pools)

# Locally set the pool classes and keys so other PoolManagers can
# override them.
Expand Down
57 changes: 31 additions & 26 deletions test/test_poolmanager.py
@@ -1,3 +1,4 @@
import gc
import socket
from test import resolvesLocalhostFQDN
from typing import Optional
Expand All @@ -8,7 +9,7 @@

from urllib3 import connection_from_url
from urllib3.connectionpool import HTTPSConnectionPool
from urllib3.exceptions import ClosedPoolError, LocationValueError
from urllib3.exceptions import LocationValueError
from urllib3.poolmanager import (
_DEFAULT_BLOCKSIZE,
PoolKey,
Expand Down Expand Up @@ -69,24 +70,12 @@ def test_many_urls(self) -> None:
def test_manager_clear(self) -> None:
p = PoolManager(5)

conn_pool = p.connection_from_url("http://google.com")
p.connection_from_url("http://google.com")
assert len(p.pools) == 1

conn = conn_pool._get_conn()

p.clear()
assert len(p.pools) == 0

with pytest.raises(ClosedPoolError):
conn_pool._get_conn()

conn_pool._put_conn(conn)

with pytest.raises(ClosedPoolError):
conn_pool._get_conn()

assert len(p.pools) == 0

@pytest.mark.parametrize("url", ["http://@", None])
def test_nohost(self, url: Optional[str]) -> None:
p = PoolManager(5)
Expand All @@ -95,19 +84,8 @@ def test_nohost(self, url: Optional[str]) -> None:

def test_contextmanager(self) -> None:
with PoolManager(1) as p:
conn_pool = p.connection_from_url("http://google.com")
p.connection_from_url("http://google.com")
assert len(p.pools) == 1
conn = conn_pool._get_conn()

assert len(p.pools) == 0

with pytest.raises(ClosedPoolError):
conn_pool._get_conn()

conn_pool._put_conn(conn)

with pytest.raises(ClosedPoolError):
conn_pool._get_conn()

assert len(p.pools) == 0

Expand Down Expand Up @@ -468,3 +446,30 @@ def test_e2e_connect_to_ipv6_scoped_tls(
conn.connect()

assert ssl_wrap_socket.call_args[1]["server_hostname"] == "a::b"

def test_thread_safty(self) -> None:
pool_manager = PoolManager(num_pools=2)

# thread 1 gets a pool for host x
pool_1 = pool_manager.connection_from_url("http://host_x:80/")

# thread 2 gets a pool for host y
pool_2 = pool_manager.connection_from_url("http://host_y:80/")

# thread 3 gets a pool for host z
pool_3 = pool_manager.connection_from_url("http://host_z:80")

# None of the pools should be closed, since all of them are referenced.
assert pool_1.pool is not None
assert pool_2.pool is not None
assert pool_3.pool is not None

conn_queue = pool_1.pool
assert conn_queue.qsize() > 0

# thread 1 stops.
del pool_1
gc.collect()

# Connection should be closed, because reference to pool_1 is gone.
assert conn_queue.qsize() == 0

0 comments on commit a80c248

Please sign in to comment.