aboutsummaryrefslogtreecommitdiff
path: root/paramiko/buffered_pipe.py
diff options
context:
space:
mode:
Diffstat (limited to 'paramiko/buffered_pipe.py')
-rw-r--r--paramiko/buffered_pipe.py83
1 files changed, 46 insertions, 37 deletions
diff --git a/paramiko/buffered_pipe.py b/paramiko/buffered_pipe.py
index 4ef5cf7..ac35b3e 100644
--- a/paramiko/buffered_pipe.py
+++ b/paramiko/buffered_pipe.py
@@ -17,7 +17,7 @@
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
"""
-Attempt to generalize the "feeder" part of a Channel: an object which can be
+Attempt to generalize the "feeder" part of a `.Channel`: an object which can be
read from and closed, but is reading from a buffer fed by another thread. The
read operations are blocking and can have a timeout set.
"""
@@ -25,11 +25,12 @@ read operations are blocking and can have a timeout set.
import array
import threading
import time
+from paramiko.py3compat import PY2, b
class PipeTimeout (IOError):
"""
- Indicates that a timeout was reached on a read from a L{BufferedPipe}.
+ Indicates that a timeout was reached on a read from a `.BufferedPipe`.
"""
pass
@@ -38,7 +39,7 @@ class BufferedPipe (object):
"""
A buffer that obeys normal read (with timeout) & close semantics for a
file or socket, but is fed data from another thread. This is used by
- L{Channel}.
+ `.Channel`.
"""
def __init__(self):
@@ -48,14 +49,26 @@ class BufferedPipe (object):
self._buffer = array.array('B')
self._closed = False
+ if PY2:
+ def _buffer_frombytes(self, data):
+ self._buffer.fromstring(data)
+
+ def _buffer_tobytes(self, limit=None):
+ return self._buffer[:limit].tostring()
+ else:
+ def _buffer_frombytes(self, data):
+ self._buffer.frombytes(data)
+
+ def _buffer_tobytes(self, limit=None):
+ return self._buffer[:limit].tobytes()
+
def set_event(self, event):
"""
Set an event on this buffer. When data is ready to be read (or the
buffer has been closed), the event will be set. When no data is
ready, the event will be cleared.
- @param event: the event to set/clear
- @type event: Event
+ :param threading.Event event: the event to set/clear
"""
self._event = event
if len(self._buffer) > 0:
@@ -68,14 +81,13 @@ class BufferedPipe (object):
Feed new data into this pipe. This method is assumed to be called
from a separate thread, so synchronization is done.
- @param data: the data to add
- @type data: str
+ :param data: the data to add, as a `str`
"""
self._lock.acquire()
try:
if self._event is not None:
self._event.set()
- self._buffer.fromstring(data)
+ self._buffer_frombytes(b(data))
self._cv.notifyAll()
finally:
self._lock.release()
@@ -83,12 +95,12 @@ class BufferedPipe (object):
def read_ready(self):
"""
Returns true if data is buffered and ready to be read from this
- feeder. A C{False} result does not mean that the feeder has closed;
+ feeder. A ``False`` result does not mean that the feeder has closed;
it means you may need to wait before more data arrives.
- @return: C{True} if a L{read} call would immediately return at least
- one byte; C{False} otherwise.
- @rtype: bool
+ :return:
+ ``True`` if a `read` call would immediately return at least one
+ byte; ``False`` otherwise.
"""
self._lock.acquire()
try:
@@ -102,26 +114,24 @@ class BufferedPipe (object):
"""
Read data from the pipe. The return value is a string representing
the data received. The maximum amount of data to be received at once
- is specified by C{nbytes}. If a string of length zero is returned,
+ is specified by ``nbytes``. If a string of length zero is returned,
the pipe has been closed.
- The optional C{timeout} argument can be a nonnegative float expressing
- seconds, or C{None} for no timeout. If a float is given, a
- C{PipeTimeout} will be raised if the timeout period value has
- elapsed before any data arrives.
-
- @param nbytes: maximum number of bytes to read
- @type nbytes: int
- @param timeout: maximum seconds to wait (or C{None}, the default, to
- wait forever)
- @type timeout: float
- @return: data
- @rtype: str
+ The optional ``timeout`` argument can be a nonnegative float expressing
+ seconds, or ``None`` for no timeout. If a float is given, a
+ `.PipeTimeout` will be raised if the timeout period value has elapsed
+ before any data arrives.
+
+ :param int nbytes: maximum number of bytes to read
+ :param float timeout:
+ maximum seconds to wait (or ``None``, the default, to wait forever)
+ :return: the read data, as a `str`
- @raise PipeTimeout: if a timeout was specified and no data was ready
- before that timeout
+ :raises PipeTimeout:
+ if a timeout was specified and no data was ready before that
+ timeout
"""
- out = ''
+ out = bytes()
self._lock.acquire()
try:
if len(self._buffer) == 0:
@@ -142,12 +152,12 @@ class BufferedPipe (object):
# something's in the buffer and we have the lock!
if len(self._buffer) <= nbytes:
- out = self._buffer.tostring()
+ out = self._buffer_tobytes()
del self._buffer[:]
if (self._event is not None) and not self._closed:
self._event.clear()
else:
- out = self._buffer[:nbytes].tostring()
+ out = self._buffer_tobytes(nbytes)
del self._buffer[:nbytes]
finally:
self._lock.release()
@@ -158,12 +168,13 @@ class BufferedPipe (object):
"""
Clear out the buffer and return all data that was in it.
- @return: any data that was in the buffer prior to clearing it out
- @rtype: str
+ :return:
+ any data that was in the buffer prior to clearing it out, as a
+ `str`
"""
self._lock.acquire()
try:
- out = self._buffer.tostring()
+ out = self._buffer_tobytes()
del self._buffer[:]
if (self._event is not None) and not self._closed:
self._event.clear()
@@ -173,7 +184,7 @@ class BufferedPipe (object):
def close(self):
"""
- Close this pipe object. Future calls to L{read} after the buffer
+ Close this pipe object. Future calls to `read` after the buffer
has been emptied will return immediately with an empty string.
"""
self._lock.acquire()
@@ -189,12 +200,10 @@ class BufferedPipe (object):
"""
Return the number of bytes buffered.
- @return: number of bytes bufferes
- @rtype: int
+ :return: number (`int`) of bytes buffered
"""
self._lock.acquire()
try:
return len(self._buffer)
finally:
self._lock.release()
-