Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Working select on write - fixes #695 #2300

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
89 changes: 85 additions & 4 deletions paramiko/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def __init__(self, chanid):
self.lock = threading.Lock()
self.out_buffer_cv = threading.Condition(self.lock)
self.in_window_size = 0
self.out_window_size = 0
self._out_window_size = 0
self.in_max_packet_size = 0
self.out_max_packet_size = 0
self.in_window_threshold = 0
Expand All @@ -127,12 +127,26 @@ def __init__(self, chanid):
self._name = str(chanid)
self.logger = util.get_logger("paramiko.transport")
self._pipe = None
self._write_pipe = None
self.event = threading.Event()
self.event_ready = False
self.combine_stderr = False
self.exit_status = -1
self.origin_addr = None

@property
def out_window_size(self):
return self._out_window_size

@out_window_size.setter
def out_window_size(self, value):
self._out_window_size = value
if self._write_pipe is not None:
if value == 0:
self._write_pipe.clear()
else:
self._write_pipe.set()

def __del__(self):
try:
self.close()
Expand Down Expand Up @@ -659,6 +673,10 @@ def close(self):
self._pipe.close()
self._pipe = None

if self._write_pipe is not None:
self._write_pipe.close()
self._write_pipe = None

if not self.active or self.closed:
return
msgs = self._close_internal()
Expand Down Expand Up @@ -913,9 +931,14 @@ def makefile_stdin(self, *params):

def fileno(self):
"""
Returns an OS-level file descriptor which can be used for polling, but
but not for reading or writing. This is primarily to allow Python's
``select`` module to work.
Legacy: calls fileno_read for backwards compatibility. Please use
fileno_read and fileno_write, or the fileno methods of the file-like
wrappers.

Returns an OS-level file descriptor which can be used for polling for
read availability on this channel, but not for actual reading or
writing. This is primarily to allow Python's ``select`` module to
work.

The first time ``fileno`` is called on a channel, a pipe is created to
simulate real OS-level file descriptor (FD) behavior. Because of this,
Expand All @@ -925,6 +948,26 @@ def fileno(self):

:return: an OS-level file descriptor (`int`)

.. warning::
This method causes channel reads to be slightly less efficient.
"""
return self.fileno_read()

def fileno_read(self):
"""
Returns an OS-level file descriptor which can be used for polling for
read availability on this channel, but not for actual reading or
writing. This is primarily to allow Python's ``select`` module to
work.

The first time ``fileno_read`` is called on a channel, a pipe is
created to simulate real OS-level file descriptor (FD) behavior.
Because of this, two OS-level FDs are created, which will use up FDs
faster than normal. (You won't notice this effect unless you have
hundreds of channels open at the same time.)

:return: an OS-level file descriptor (`int`)

.. warning::
This method causes channel reads to be slightly less efficient.
"""
Expand All @@ -941,6 +984,36 @@ def fileno(self):
finally:
self.lock.release()

def fileno_write(self):
"""
Returns an OS-level file descriptor which can be used for polling for
write availability on this channel, but not for actual reading or
writing. This is primarily to allow Python's ``select`` module to
work.

The first time ``fileno_read`` is called on a channel, a pipe is
created to simulate real OS-level file descriptor (FD) behavior.
Because of this, two OS-level FDs are created, which will use up FDs
faster than normal. (You won't notice this effect unless you have
hundreds of channels open at the same time.)

:return: an OS-level file descriptor (`int`)

.. warning::
This method causes channel writes to be slightly less efficient.
"""
self.lock.acquire()
try:
if self._write_pipe is not None:
return self._write_pipe.fileno()
# create the pipe and feed in any existing data
self._write_pipe = pipe.make_write_pipe()
# set the pipe status
self.out_window_size = self.out_window_size
return self._write_pipe.fileno()
finally:
self.lock.release()

def shutdown(self, how):
"""
Shut down one or both halves of the connection. If ``how`` is 0,
Expand Down Expand Up @@ -1234,6 +1307,8 @@ def _set_closed(self):
self.status_event.set()
if self._pipe is not None:
self._pipe.set_forever()
if self._write_pipe is not None:
self._write_pipe.clear_forever()

def _send_eof(self):
# you are holding the lock.
Expand Down Expand Up @@ -1349,6 +1424,9 @@ def __init__(self, channel, mode="r", bufsize=-1):
BufferedFile.__init__(self)
self._set_mode(mode, bufsize)

def fileno(self):
return self.channel.fileno_read()

def __repr__(self):
"""
Returns a string representation of this object, for debugging.
Expand Down Expand Up @@ -1385,6 +1463,9 @@ class ChannelStdinFile(ChannelFile):
See `Channel.makefile_stdin` for details.
"""

def fileno(self):
return self.channel.fileno_write()

def close(self):
super().close()
self.channel.shutdown_write()
100 changes: 100 additions & 0 deletions paramiko/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import sys
import os
import socket
import select


def make_pipe():
Expand All @@ -38,6 +39,51 @@ def make_pipe():
return p


def make_write_pipe():
if sys.platform[:3] != "win":
p = PosixWritePipe()
else:
p = WindowsWritePipe()
return p


class PosixWritePipe:
def __init__(self):
self._rsock, self._wsock = socket.socketpair()
self._wsock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 4096)
for i in range(4096):
if len(select.select([], [self._wsock], [], 0)[1]) == 0:
break
self._wsock.send(b"\0")
self._set = False
self._closed = False
self._forever = False

def close(self):
os.close(self._rsock)
os.close(self._wsock)
self._closed = True

def fileno(self):
return self._wsock.fileno()

def clear(self):
if not self._set or self._closed:
return
self._wsock.send(b"\0")
self._set = False

def set(self):
if self._set or self._closed or self._forever:
return
self._set = True
self._rsock.recv(1)

def clear_forever(self):
self._forever = True
self.clear()


class PosixPipe:
def __init__(self):
self._rfd, self._wfd = os.pipe()
Expand Down Expand Up @@ -71,6 +117,60 @@ def set_forever(self):
self.set()


class WindowsWritePipe:
"""
On Windows, only an OS-level "WinSock" may be used in select(), but reads
and writes must be to the actual socket object.
"""

def __init__(self):
serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serv.bind(("127.0.0.1", 0))
serv.listen(1)

# need to save sockets in _rsock/_wsock so they don't get closed
self._rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._rsock.connect(("127.0.0.1", serv.getsockname()[1]))

self._wsock, addr = serv.accept()
serv.close()

# Can this be done?
self._wsock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 4096)
for i in range(4096):
if len(select.select([], [self._wsock], [], 0)[1]) == 0:
break
self._wsock.send(b"\0")

self._set = False
self._forever = False
self._closed = False

def close(self):
os.close(self._rsock)
os.close(self._wsock)
self._closed = True

def fileno(self):
return self._wsock.fileno()

def clear(self):
if not self._set or self._closed:
return
self._wsock.send(b"\0")
self._set = False

def set(self):
if self._set or self._closed or self._forever:
return
self._set = True
self._rsock.recv(1)

def clear_forever(self):
self._forever = True
self.clear()


class WindowsPipe:
"""
On Windows, only an OS-level "WinSock" may be used in select(), but reads
Expand Down