Skip to content

Commit

Permalink
Added socks_proxy fixture to make tests runnable
Browse files Browse the repository at this point in the history
  • Loading branch information
cdeler committed Oct 2, 2020
1 parent 8292194 commit 4618f88
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 9 deletions.
13 changes: 5 additions & 8 deletions tests/async_tests/test_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,9 @@ async def test_explicit_backend_name() -> None:
"http2",
[True, False],
)
async def test_socks_proxy_connection_without_auth(url, http2):
async def test_socks_proxy_connection_without_auth(socks_proxy, url, http2):
(_, hostname, *_) = url
proxy_origin = (b"socks5", b"localhost", 1085)
proxy_origin = socks_proxy.without_auth
headers = [(b"host", hostname)]
method = b"GET"

Expand All @@ -395,17 +395,14 @@ async def test_socks_proxy_connection_without_auth(url, http2):
],
)
@pytest.mark.parametrize("http2", [True, False])
async def test_socks5_proxy_connection_with_auth(url, http2):
async def test_socks5_proxy_connection_with_auth(socks_proxy, url, http2):
(_, hostname, *_) = url
proxy_origin = (b"socks5", b"localhost", 1086)
proxy_origin, credentials = socks_proxy.with_auth
headers = [(b"host", hostname)]
method = b"GET"

async with httpcore.AsyncSocksProxy(
proxy_origin,
"socks5",
SocksProxyCredentials(username=b"username", password=b"password"),
http2=http2,
proxy_origin, "socks5", credentials, http2=http2
) as connection:
(
status_code,
Expand Down
50 changes: 49 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import trustme
import uvicorn

from httpcore._types import URL
from httpcore._types import Origin, SocksProxyCredentials, URL


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -59,6 +59,54 @@ def proxy_server() -> typing.Iterator[URL]:
proc.kill()


SocksFixture = typing.NamedTuple(
"SocksFixture",
[
("with_auth", typing.Tuple[Origin, SocksProxyCredentials]),
("without_auth", Origin),
],
)


@pytest.fixture(scope="session")
def socks_proxy() -> typing.Iterator[SocksFixture]:
no_auth_host = "127.0.01"
no_auth_port = 1085

auth_host = "127.0.0.1"
auth_port = 1086
auth_user = "username"
auth_pwd = "password"

proc = None
try:
command_str = (
f"pproxy -l socks5://{no_auth_host}:{no_auth_port} "
f"--auth 0 -l 'socks5://{auth_host}:{auth_port}#{auth_user}:{auth_pwd}'"
)

command = shlex.split(command_str)
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

wait_until_pproxy_serve_on_port(auth_host, auth_port)
wait_until_pproxy_serve_on_port(no_auth_host, no_auth_port)

yield SocksFixture(
with_auth=(
(
b"socks5",
auth_host.encode(),
auth_port,
),
SocksProxyCredentials(auth_user.encode(), auth_pwd.encode()),
),
without_auth=(b"socks5", no_auth_host.encode(), no_auth_port),
)
finally:
if proc is not None:
proc.kill()


class Server(uvicorn.Server):
def install_signal_handlers(self) -> None:
pass
Expand Down

0 comments on commit 4618f88

Please sign in to comment.