Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cdeler committed Sep 24, 2020
1 parent f9eddc3 commit f6da5c4
Show file tree
Hide file tree
Showing 8 changed files with 500 additions and 203 deletions.
18 changes: 6 additions & 12 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .._backends.base import lookup_async_backend
from .._exceptions import LocalProtocolError, PoolTimeout, UnsupportedProtocol
from .._threadlock import ThreadLock
from .._types import URL, Headers, Origin, Socks5ProxyConfig, TimeoutDict
from .._types import URL, Headers, Origin, SocksProxyConfig, TimeoutDict
from .._utils import get_logger, origin_to_url_string, url_to_origin
from .base import (
AsyncByteStream,
Expand Down Expand Up @@ -99,7 +99,7 @@ def __init__(
local_address: str = None,
max_keepalive: int = None,
backend: str = "auto",
socks5_proxy_conf: Socks5ProxyConfig = None,
socks_proxy_config: SocksProxyConfig = None,
):
if max_keepalive is not None:
warnings.warn(
Expand All @@ -119,7 +119,7 @@ def __init__(
self._thread_lock = ThreadLock()
self._backend = lookup_async_backend(backend)
self._next_keepalive_check = 0.0
self._socks5_proxy_conf = socks5_proxy_conf
self._socks_proxy_conf = socks_proxy_config

if http2:
try:
Expand All @@ -130,7 +130,7 @@ def __init__(
"package is not installed. Use 'pip install httpcore[http2]'."
)

if socks5_proxy_conf is not None:
if socks_proxy_config is not None:
module_spec = importlib.util.find_spec("socksio")
if module_spec is None:
raise ImportError(
Expand Down Expand Up @@ -188,23 +188,17 @@ async def arequest(
connection = await self._get_connection_from_pool(origin)

if connection is None:
if self._socks5_proxy_conf:
if self._socks_proxy_conf:
from .socks_proxy import AsyncSOCKSConnection

proxy_origin = (
b"socks5",
self._socks5_proxy_conf.host,
self._socks5_proxy_conf.port,
)
connection = AsyncSOCKSConnection(
origin=origin,
http2=self._http2,
uds=self._uds,
ssl_context=self._ssl_context,
local_address=self._local_address,
backend=self._backend,
proxy_origin=proxy_origin,
proxy_credentials=self._socks5_proxy_conf.auth_credentials,
proxy_config=self._socks_proxy_conf,
)
else:
connection = AsyncHTTPConnection(
Expand Down
236 changes: 173 additions & 63 deletions httpcore/_async/socks_proxy.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,72 @@
from ssl import SSLContext

from socksio import socks5
from socksio import socks4, socks5

from .._backends.auto import AsyncBackend, AsyncSocketStream
from .._exceptions import ProxyError
from .._types import Origin, Socks5ProxyCredentials, TimeoutDict
from .._types import (
Origin,
Socks5ProxyCredentials,
SocksProxyConfig,
SocksProxyType,
TimeoutDict,
)
from .._utils import get_logger
from .connection import AsyncHTTPConnection

logger = get_logger(__name__)


class AsyncSOCKSConnection(AsyncHTTPConnection):
def __init__(
class AsyncSocksProxyProtocol:
async def connect(
self,
origin: Origin,
http2: bool = False,
uds: str = None,
ssl_context: SSLContext = None,
socket: AsyncSocketStream = None,
local_address: str = None,
backend: AsyncBackend = None,
*,
proxy_origin: Origin,
proxy_credentials: Socks5ProxyCredentials = None,
):
assert proxy_origin[0] in (b"socks5",)
socket: AsyncSocketStream,
hostname: bytes,
port: int,
timeout: TimeoutDict,
) -> None:
raise NotImplementedError

super().__init__(
origin, http2, uds, ssl_context, socket, local_address, backend
)
self.proxy_origin = proxy_origin

class AsyncSocks5ProxyProtocol(AsyncSocksProxyProtocol):
def __init__(self, proxy_credentials: Socks5ProxyCredentials = None):
self.proxy_connection = socks5.SOCKS5Connection()
self.proxy_credentials = proxy_credentials

async def _open_socket(self, timeout: TimeoutDict = None) -> AsyncSocketStream:
_, proxy_hostname, proxy_port = self.proxy_origin
scheme, hostname, port = self.origin
ssl_context = self.ssl_context if scheme == b"https" else None
timeout = timeout or {}
async def connect(
self,
socket: AsyncSocketStream,
hostname: bytes,
port: int,
timeout: TimeoutDict,
) -> None:
await self._auth_if_required(socket, timeout)

try:
proxy_socket = await self.backend.open_tcp_stream(
proxy_hostname,
proxy_port,
None,
timeout,
local_address=self.local_address,
)
is_connected = await self._connect_to_remote_host(
socket, hostname, port, timeout
)

await self._auth_proxy(proxy_socket, timeout)
await self._connect_through_proxy(proxy_socket, hostname, port, timeout)
if not is_connected:
raise ProxyError("Cannot connect through the proxy.")

if ssl_context:
proxy_socket = await proxy_socket.start_tls(
hostname, ssl_context, timeout
async def _auth_if_required(
self, socket: AsyncSocketStream, timeout: TimeoutDict
) -> None:
is_auth_required = await self._check_for_authentication(socket, timeout)

if is_auth_required:
if self.proxy_credentials is None:
raise ProxyError(
"This proxy requires auth, but you didn't set user/password"
)

return proxy_socket
except Exception: # noqa: PIE786
self.connect_failed = True
raise
user, password = self.proxy_credentials

await self._auth_using_login_and_password(socket, user, password, timeout)

async def _auth_proxy(
async def _check_for_authentication(
self, socket: AsyncSocketStream, timeout: TimeoutDict
) -> None:
) -> bool:
auth_request = socks5.SOCKS5AuthMethodsRequest(
[
socks5.SOCKS5AuthMethod.NO_AUTH_REQUIRED,
Expand All @@ -80,32 +82,35 @@ async def _auth_proxy(
data = await socket.read(1024, timeout)
auth_ev = self.proxy_connection.receive_data(data)

if auth_ev.method == socks5.SOCKS5AuthMethod.USERNAME_PASSWORD: # type: ignore
if self.proxy_credentials is None:
raise ProxyError(
"This proxy requires auth, but you didn't set user/password"
)
return (
auth_ev.method == socks5.SOCKS5AuthMethod.USERNAME_PASSWORD # type: ignore
)

user, password = self.proxy_credentials
user_password_request = socks5.SOCKS5UsernamePasswordRequest(user, password)
self.proxy_connection.send(user_password_request)
await socket.write(self.proxy_connection.data_to_send(), timeout)
user_password_response = await socket.read(2048, timeout)
async def _auth_using_login_and_password(
self,
socket: AsyncSocketStream,
user: bytes,
password: bytes,
timeout: TimeoutDict,
) -> None:
user_password_request = socks5.SOCKS5UsernamePasswordRequest(user, password)
self.proxy_connection.send(user_password_request)

user_password_event = self.proxy_connection.receive_data(
user_password_response
)
await socket.write(self.proxy_connection.data_to_send(), timeout)
user_password_response = await socket.read(2048, timeout)

user_password_event = self.proxy_connection.receive_data(user_password_response)

if not user_password_event.success: # type: ignore
raise ProxyError("Invalid user/password provided to proxy auth")
if not user_password_event.success: # type: ignore
raise ProxyError("Invalid user/password provided to proxy auth")

async def _connect_through_proxy(
async def _connect_to_remote_host(
self,
socket: AsyncSocketStream,
hostname: bytes,
port: int,
timeout: TimeoutDict,
) -> None:
) -> bool:
connect_request = socks5.SOCKS5CommandRequest.from_address(
socks5.SOCKS5Command.CONNECT, (hostname, port)
)
Expand All @@ -117,5 +122,110 @@ async def _connect_through_proxy(
data = await socket.read(1024, timeout)
event = self.proxy_connection.receive_data(data)

if event.reply_code != socks5.SOCKS5ReplyCode.SUCCEEDED: # type: ignore
raise ProxyError(f"Cannot connect through the proxy. {event!r}")
return event.reply_code == socks5.SOCKS5ReplyCode.SUCCEEDED # type: ignore


class AsyncSocks4Protocol(AsyncSocksProxyProtocol):
def __init__(self, user_id: bytes):
self.proxy_connection = socks4.SOCKS4Connection(user_id=user_id)

async def connect(
self,
socket: AsyncSocketStream,
hostname: bytes,
port: int,
timeout: TimeoutDict,
) -> None:
connection_request = socks4.SOCKS4Request.from_address(
socks4.SOCKS4Command.CONNECT, (hostname, port)
)
self.proxy_connection.send(connection_request)
await socket.write(self.proxy_connection.data_to_send(), timeout)

connection_response = await socket.read(1024, timeout)
connection_event = self.proxy_connection.receive_data(connection_response)
if connection_event.reply_code != socks4.SOCKS4ReplyCode.REQUEST_GRANTED:
raise ProxyError(
"Server could not connect to remote host: "
f"{connection_event.reply_code}"
)


class AsyncSOCKSConnection(AsyncHTTPConnection):
"""
SOCKS5 proxy workflow:
1. establish a connection with proxy server
2. if it's SOCKS5 proxy, then check for authentication
2.1 if auth is required, then authenticate using user-password (optional)
3. connect to remote server through the proxy
4. upgrade the connection to TLS (optional)
SOCKS4(a) proxy workflow:
1. establish a connection with proxy server
2. connect to remote server through the proxy
3. upgrade the connection to TLS (optional)
"""

def __init__(
self,
origin: Origin,
http2: bool = False,
uds: str = None,
ssl_context: SSLContext = None,
socket: AsyncSocketStream = None,
local_address: str = None,
backend: AsyncBackend = None,
*,
proxy_config: SocksProxyConfig,
):
super().__init__(
origin, http2, uds, ssl_context, socket, local_address, backend
)
self.proxy_config = proxy_config

if proxy_config.proxy_type is SocksProxyType.socks5:
if not (
isinstance(proxy_config.auth_credentials, Socks5ProxyCredentials)
or proxy_config.auth_credentials is None
):
raise ValueError
self.proxy_protocol: AsyncSocksProxyProtocol = AsyncSocks5ProxyProtocol(
proxy_config.auth_credentials
)
elif proxy_config.proxy_type is SocksProxyType.socks4:
if not isinstance(proxy_config.auth_credentials, bytes):
raise ValueError("")

self.proxy_protocol = AsyncSocks4Protocol(proxy_config.auth_credentials)
elif proxy_config.proxy_type is SocksProxyType.socks4a:
raise NotImplementedError
else:
raise AssertionError

async def _open_socket(self, timeout: TimeoutDict = None) -> AsyncSocketStream:
proxy_hostname, proxy_port = self.proxy_config.origin
scheme, hostname, port = self.origin
ssl_context = self.ssl_context if scheme == b"https" else None
timeout = timeout or {}

try:
proxy_socket = await self.backend.open_tcp_stream(
proxy_hostname,
proxy_port,
None,
timeout,
local_address=self.local_address,
)

await self.proxy_protocol.connect(proxy_socket, hostname, port, timeout)

if ssl_context:
proxy_socket = await proxy_socket.start_tls(
hostname, ssl_context, timeout
)

return proxy_socket
except Exception: # noqa: PIE786
self.connect_failed = True
raise

0 comments on commit f6da5c4

Please sign in to comment.