aboutsummaryrefslogtreecommitdiff
path: root/paramiko/sftp_file.py
diff options
context:
space:
mode:
Diffstat (limited to 'paramiko/sftp_file.py')
-rw-r--r--paramiko/sftp_file.py241
1 files changed, 205 insertions, 36 deletions
diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py
index f224f02..cfa7db1 100644
--- a/paramiko/sftp_file.py
+++ b/paramiko/sftp_file.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net>
+# Copyright (C) 2003-2007 Robey Pointer <robey@lag.net>
#
# This file is part of paramiko.
#
@@ -20,7 +20,11 @@
L{SFTPFile}
"""
+from binascii import hexlify
+import socket
import threading
+import time
+
from paramiko.common import *
from paramiko.sftp import *
from paramiko.file import BufferedFile
@@ -43,12 +47,18 @@ class SFTPFile (BufferedFile):
BufferedFile._set_mode(self, mode, bufsize)
self.pipelined = False
self._prefetching = False
+ self._prefetch_done = False
+ self._prefetch_data = {}
+ self._prefetch_reads = []
self._saved_exception = None
def __del__(self):
- self.close(_async=True)
+ self._close(async=True)
+
+ def close(self):
+ self._close(async=False)
- def close(self, _async=False):
+ def _close(self, async=False):
# We allow double-close without signaling an error, because real
# Python file objects do. However, we must protect against actually
# sending multiple CMD_CLOSE packets, because after we close our
@@ -58,11 +68,12 @@ class SFTPFile (BufferedFile):
# __del__.)
if self._closed:
return
+ self.sftp._log(DEBUG, 'close(%s)' % hexlify(self.handle))
if self.pipelined:
self.sftp._finish_responses(self)
BufferedFile.close(self)
try:
- if _async:
+ if async:
# GC'd file handle could be called from an arbitrary thread -- don't wait for a response
self.sftp._async_request(type(None), CMD_CLOSE, self.handle)
else:
@@ -70,34 +81,77 @@ class SFTPFile (BufferedFile):
except EOFError:
# may have outlived the Transport connection
pass
- except IOError:
+ except (IOError, socket.error):
# may have outlived the Transport connection
pass
+ def _data_in_prefetch_requests(self, offset, size):
+ k = [i for i in self._prefetch_reads if i[0] <= offset]
+ if len(k) == 0:
+ return False
+ k.sort(lambda x, y: cmp(x[0], y[0]))
+ buf_offset, buf_size = k[-1]
+ if buf_offset + buf_size <= offset:
+ # prefetch request ends before this one begins
+ return False
+ if buf_offset + buf_size >= offset + size:
+ # inclusive
+ return True
+ # well, we have part of the request. see if another chunk has the rest.
+ return self._data_in_prefetch_requests(buf_offset + buf_size, offset + size - buf_offset - buf_size)
+
+ def _data_in_prefetch_buffers(self, offset):
+ """
+ if a block of data is present in the prefetch buffers, at the given
+ offset, return the offset of the relevant prefetch buffer. otherwise,
+ return None. this guarantees nothing about the number of bytes
+ collected in the prefetch buffer so far.
+ """
+ k = [i for i in self._prefetch_data.keys() if i <= offset]
+ if len(k) == 0:
+ return None
+ index = max(k)
+ buf_offset = offset - index
+ if buf_offset >= len(self._prefetch_data[index]):
+ # it's not here
+ return None
+ return index
+
def _read_prefetch(self, size):
+ """
+ read data out of the prefetch buffer, if possible. if the data isn't
+ in the buffer, return None. otherwise, behaves like a normal read.
+ """
# while not closed, and haven't fetched past the current position, and haven't reached EOF...
- while (self._prefetch_so_far <= self._realpos) and \
- (self._prefetch_so_far < self._prefetch_size) and not self._closed:
+ while True:
+ offset = self._data_in_prefetch_buffers(self._realpos)
+ if offset is not None:
+ break
+ if self._prefetch_done or self._closed:
+ break
self.sftp._read_response()
- self._check_exception()
- k = self._prefetch_data.keys()
- k.sort()
- while (len(k) > 0) and (k[0] + len(self._prefetch_data[k[0]]) <= self._realpos):
- # done with that block
- del self._prefetch_data[k[0]]
- k.pop(0)
- if len(k) == 0:
+ self._check_exception()
+ if offset is None:
self._prefetching = False
- return ''
- assert k[0] <= self._realpos
- buf_offset = self._realpos - k[0]
- buf_length = len(self._prefetch_data[k[0]]) - buf_offset
- return self._prefetch_data[k[0]][buf_offset : buf_offset + buf_length]
+ return None
+ prefetch = self._prefetch_data[offset]
+ del self._prefetch_data[offset]
+
+ buf_offset = self._realpos - offset
+ if buf_offset > 0:
+ self._prefetch_data[offset] = prefetch[:buf_offset]
+ prefetch = prefetch[buf_offset:]
+ if size < len(prefetch):
+ self._prefetch_data[self._realpos + size] = prefetch[size:]
+ prefetch = prefetch[:size]
+ return prefetch
def _read(self, size):
size = min(size, self.MAX_REQUEST_SIZE)
if self._prefetching:
- return self._read_prefetch(size)
+ data = self._read_prefetch(size)
+ if data is not None:
+ return data
t, msg = self.sftp._request(CMD_READ, self.handle, long(self._realpos), int(size))
if t != CMD_DATA:
raise SFTPError('Expected data')
@@ -106,8 +160,7 @@ class SFTPFile (BufferedFile):
def _write(self, data):
# may write less than requested if it would exceed max packet size
chunk = min(len(data), self.MAX_REQUEST_SIZE)
- req = self.sftp._async_request(type(None), CMD_WRITE, self.handle, long(self._realpos),
- str(data[:chunk]))
+ req = self.sftp._async_request(type(None), CMD_WRITE, self.handle, long(self._realpos), str(data[:chunk]))
if not self.pipelined or self.sftp.sock.recv_ready():
t, msg = self.sftp._read_response(req)
if t != CMD_STATUS:
@@ -173,6 +226,71 @@ class SFTPFile (BufferedFile):
if t != CMD_ATTRS:
raise SFTPError('Expected attributes')
return SFTPAttributes._from_msg(msg)
+
+ def chmod(self, mode):
+ """
+ Change the mode (permissions) of this file. The permissions are
+ unix-style and identical to those used by python's C{os.chmod}
+ function.
+
+ @param mode: new permissions
+ @type mode: int
+ """
+ self.sftp._log(DEBUG, 'chmod(%s, %r)' % (hexlify(self.handle), mode))
+ attr = SFTPAttributes()
+ attr.st_mode = mode
+ self.sftp._request(CMD_FSETSTAT, self.handle, attr)
+
+ def chown(self, uid, gid):
+ """
+ Change the owner (C{uid}) and group (C{gid}) of this file. As with
+ python's C{os.chown} function, you must pass both arguments, so if you
+ only want to change one, use L{stat} first to retrieve the current
+ owner and group.
+
+ @param uid: new owner's uid
+ @type uid: int
+ @param gid: new group id
+ @type gid: int
+ """
+ self.sftp._log(DEBUG, 'chown(%s, %r, %r)' % (hexlify(self.handle), uid, gid))
+ attr = SFTPAttributes()
+ attr.st_uid, attr.st_gid = uid, gid
+ self.sftp._request(CMD_FSETSTAT, self.handle, attr)
+
+ def utime(self, times):
+ """
+ Set the access and modified times of this file. If
+ C{times} is C{None}, then the file's access and modified times are set
+ to the current time. Otherwise, C{times} must be a 2-tuple of numbers,
+ of the form C{(atime, mtime)}, which is used to set the access and
+ modified times, respectively. This bizarre API is mimicked from python
+ for the sake of consistency -- I apologize.
+
+ @param times: C{None} or a tuple of (access time, modified time) in
+ standard internet epoch time (seconds since 01 January 1970 GMT)
+ @type times: tuple(int)
+ """
+ if times is None:
+ times = (time.time(), time.time())
+ self.sftp._log(DEBUG, 'utime(%s, %r)' % (hexlify(self.handle), times))
+ attr = SFTPAttributes()
+ attr.st_atime, attr.st_mtime = times
+ self.sftp._request(CMD_FSETSTAT, self.handle, attr)
+
+ def truncate(self, size):
+ """
+ Change the size of this file. This usually extends
+ or shrinks the size of the file, just like the C{truncate()} method on
+ python file objects.
+
+ @param size: the new size of the file
+ @type size: int or long
+ """
+ self.sftp._log(DEBUG, 'truncate(%s, %r)' % (hexlify(self.handle), size))
+ attr = SFTPAttributes()
+ attr.st_size = size
+ self.sftp._request(CMD_FSETSTAT, self.handle, attr)
def check(self, hash_algorithm, offset=0, length=0, block_size=0):
"""
@@ -255,26 +373,60 @@ class SFTPFile (BufferedFile):
dramatically improve the download speed by avoiding roundtrip latency.
The file's contents are incrementally buffered in a background thread.
+ The prefetched data is stored in a buffer until read via the L{read}
+ method. Once data has been read, it's removed from the buffer. The
+ data may be read in a random order (using L{seek}); chunks of the
+ buffer that haven't been read will continue to be buffered.
+
@since: 1.5.1
"""
size = self.stat().st_size
# queue up async reads for the rest of the file
- self._prefetching = True
- self._prefetch_so_far = self._realpos
- self._prefetch_size = size
- self._prefetch_data = {}
- t = threading.Thread(target=self._prefetch)
- t.setDaemon(True)
- t.start()
-
- def _prefetch(self):
+ chunks = []
n = self._realpos
- size = self._prefetch_size
while n < size:
chunk = min(self.MAX_REQUEST_SIZE, size - n)
- self.sftp._async_request(self, CMD_READ, self.handle, long(n), int(chunk))
+ chunks.append((n, chunk))
n += chunk
+ if len(chunks) > 0:
+ self._start_prefetch(chunks)
+
+ def readv(self, chunks):
+ """
+ Read a set of blocks from the file by (offset, length). This is more
+ efficient than doing a series of L{seek} and L{read} calls, since the
+ prefetch machinery is used to retrieve all the requested blocks at
+ once.
+
+ @param chunks: a list of (offset, length) tuples indicating which
+ sections of the file to read
+ @type chunks: list(tuple(long, int))
+ @return: a list of blocks read, in the same order as in C{chunks}
+ @rtype: list(str)
+
+ @since: 1.5.4
+ """
+ self.sftp._log(DEBUG, 'readv(%s, %r)' % (hexlify(self.handle), chunks))
+ read_chunks = []
+ for offset, size in chunks:
+ # don't fetch data that's already in the prefetch buffer
+ if self._data_in_prefetch_buffers(offset) or self._data_in_prefetch_requests(offset, size):
+ continue
+
+ # break up anything larger than the max read size
+ while size > 0:
+ chunk_size = min(size, self.MAX_REQUEST_SIZE)
+ read_chunks.append((offset, chunk_size))
+ offset += chunk_size
+ size -= chunk_size
+
+ self._start_prefetch(read_chunks)
+ # now we can just devolve to a bunch of read()s :)
+ for x in chunks:
+ self.seek(x[0])
+ yield self.read(x[1])
+
### internals...
@@ -285,6 +437,21 @@ class SFTPFile (BufferedFile):
except:
return 0
+ def _start_prefetch(self, chunks):
+ self._prefetching = True
+ self._prefetch_done = False
+ self._prefetch_reads.extend(chunks)
+
+ t = threading.Thread(target=self._prefetch_thread, args=(chunks,))
+ t.setDaemon(True)
+ t.start()
+
+ def _prefetch_thread(self, chunks):
+ # do these read requests in a temporary thread because there may be
+ # a lot of them, so it may block.
+ for offset, length in chunks:
+ self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length))
+
def _async_response(self, t, msg):
if t == CMD_STATUS:
# save exception and re-raise it on next file operation
@@ -296,8 +463,10 @@ class SFTPFile (BufferedFile):
if t != CMD_DATA:
raise SFTPError('Expected data')
data = msg.get_string()
- self._prefetch_data[self._prefetch_so_far] = data
- self._prefetch_so_far += len(data)
+ offset, length = self._prefetch_reads.pop(0)
+ self._prefetch_data[offset] = data
+ if len(self._prefetch_reads) == 0:
+ self._prefetch_done = True
def _check_exception(self):
"if there's a saved exception, raise & clear it"