diff options
Diffstat (limited to 'paramiko/sftp_file.py')
-rw-r--r-- | paramiko/sftp_file.py | 241 |
1 files changed, 205 insertions, 36 deletions
diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py index f224f02..cfa7db1 100644 --- a/paramiko/sftp_file.py +++ b/paramiko/sftp_file.py @@ -1,4 +1,4 @@ -# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net> +# Copyright (C) 2003-2007 Robey Pointer <robey@lag.net> # # This file is part of paramiko. # @@ -20,7 +20,11 @@ L{SFTPFile} """ +from binascii import hexlify +import socket import threading +import time + from paramiko.common import * from paramiko.sftp import * from paramiko.file import BufferedFile @@ -43,12 +47,18 @@ class SFTPFile (BufferedFile): BufferedFile._set_mode(self, mode, bufsize) self.pipelined = False self._prefetching = False + self._prefetch_done = False + self._prefetch_data = {} + self._prefetch_reads = [] self._saved_exception = None def __del__(self): - self.close(_async=True) + self._close(async=True) + + def close(self): + self._close(async=False) - def close(self, _async=False): + def _close(self, async=False): # We allow double-close without signaling an error, because real # Python file objects do. However, we must protect against actually # sending multiple CMD_CLOSE packets, because after we close our @@ -58,11 +68,12 @@ class SFTPFile (BufferedFile): # __del__.) if self._closed: return + self.sftp._log(DEBUG, 'close(%s)' % hexlify(self.handle)) if self.pipelined: self.sftp._finish_responses(self) BufferedFile.close(self) try: - if _async: + if async: # GC'd file handle could be called from an arbitrary thread -- don't wait for a response self.sftp._async_request(type(None), CMD_CLOSE, self.handle) else: @@ -70,34 +81,77 @@ class SFTPFile (BufferedFile): except EOFError: # may have outlived the Transport connection pass - except IOError: + except (IOError, socket.error): # may have outlived the Transport connection pass + def _data_in_prefetch_requests(self, offset, size): + k = [i for i in self._prefetch_reads if i[0] <= offset] + if len(k) == 0: + return False + k.sort(lambda x, y: cmp(x[0], y[0])) + buf_offset, buf_size = k[-1] + if buf_offset + buf_size <= offset: + # prefetch request ends before this one begins + return False + if buf_offset + buf_size >= offset + size: + # inclusive + return True + # well, we have part of the request. see if another chunk has the rest. + return self._data_in_prefetch_requests(buf_offset + buf_size, offset + size - buf_offset - buf_size) + + def _data_in_prefetch_buffers(self, offset): + """ + if a block of data is present in the prefetch buffers, at the given + offset, return the offset of the relevant prefetch buffer. otherwise, + return None. this guarantees nothing about the number of bytes + collected in the prefetch buffer so far. + """ + k = [i for i in self._prefetch_data.keys() if i <= offset] + if len(k) == 0: + return None + index = max(k) + buf_offset = offset - index + if buf_offset >= len(self._prefetch_data[index]): + # it's not here + return None + return index + def _read_prefetch(self, size): + """ + read data out of the prefetch buffer, if possible. if the data isn't + in the buffer, return None. otherwise, behaves like a normal read. + """ # while not closed, and haven't fetched past the current position, and haven't reached EOF... - while (self._prefetch_so_far <= self._realpos) and \ - (self._prefetch_so_far < self._prefetch_size) and not self._closed: + while True: + offset = self._data_in_prefetch_buffers(self._realpos) + if offset is not None: + break + if self._prefetch_done or self._closed: + break self.sftp._read_response() - self._check_exception() - k = self._prefetch_data.keys() - k.sort() - while (len(k) > 0) and (k[0] + len(self._prefetch_data[k[0]]) <= self._realpos): - # done with that block - del self._prefetch_data[k[0]] - k.pop(0) - if len(k) == 0: + self._check_exception() + if offset is None: self._prefetching = False - return '' - assert k[0] <= self._realpos - buf_offset = self._realpos - k[0] - buf_length = len(self._prefetch_data[k[0]]) - buf_offset - return self._prefetch_data[k[0]][buf_offset : buf_offset + buf_length] + return None + prefetch = self._prefetch_data[offset] + del self._prefetch_data[offset] + + buf_offset = self._realpos - offset + if buf_offset > 0: + self._prefetch_data[offset] = prefetch[:buf_offset] + prefetch = prefetch[buf_offset:] + if size < len(prefetch): + self._prefetch_data[self._realpos + size] = prefetch[size:] + prefetch = prefetch[:size] + return prefetch def _read(self, size): size = min(size, self.MAX_REQUEST_SIZE) if self._prefetching: - return self._read_prefetch(size) + data = self._read_prefetch(size) + if data is not None: + return data t, msg = self.sftp._request(CMD_READ, self.handle, long(self._realpos), int(size)) if t != CMD_DATA: raise SFTPError('Expected data') @@ -106,8 +160,7 @@ class SFTPFile (BufferedFile): def _write(self, data): # may write less than requested if it would exceed max packet size chunk = min(len(data), self.MAX_REQUEST_SIZE) - req = self.sftp._async_request(type(None), CMD_WRITE, self.handle, long(self._realpos), - str(data[:chunk])) + req = self.sftp._async_request(type(None), CMD_WRITE, self.handle, long(self._realpos), str(data[:chunk])) if not self.pipelined or self.sftp.sock.recv_ready(): t, msg = self.sftp._read_response(req) if t != CMD_STATUS: @@ -173,6 +226,71 @@ class SFTPFile (BufferedFile): if t != CMD_ATTRS: raise SFTPError('Expected attributes') return SFTPAttributes._from_msg(msg) + + def chmod(self, mode): + """ + Change the mode (permissions) of this file. The permissions are + unix-style and identical to those used by python's C{os.chmod} + function. + + @param mode: new permissions + @type mode: int + """ + self.sftp._log(DEBUG, 'chmod(%s, %r)' % (hexlify(self.handle), mode)) + attr = SFTPAttributes() + attr.st_mode = mode + self.sftp._request(CMD_FSETSTAT, self.handle, attr) + + def chown(self, uid, gid): + """ + Change the owner (C{uid}) and group (C{gid}) of this file. As with + python's C{os.chown} function, you must pass both arguments, so if you + only want to change one, use L{stat} first to retrieve the current + owner and group. + + @param uid: new owner's uid + @type uid: int + @param gid: new group id + @type gid: int + """ + self.sftp._log(DEBUG, 'chown(%s, %r, %r)' % (hexlify(self.handle), uid, gid)) + attr = SFTPAttributes() + attr.st_uid, attr.st_gid = uid, gid + self.sftp._request(CMD_FSETSTAT, self.handle, attr) + + def utime(self, times): + """ + Set the access and modified times of this file. If + C{times} is C{None}, then the file's access and modified times are set + to the current time. Otherwise, C{times} must be a 2-tuple of numbers, + of the form C{(atime, mtime)}, which is used to set the access and + modified times, respectively. This bizarre API is mimicked from python + for the sake of consistency -- I apologize. + + @param times: C{None} or a tuple of (access time, modified time) in + standard internet epoch time (seconds since 01 January 1970 GMT) + @type times: tuple(int) + """ + if times is None: + times = (time.time(), time.time()) + self.sftp._log(DEBUG, 'utime(%s, %r)' % (hexlify(self.handle), times)) + attr = SFTPAttributes() + attr.st_atime, attr.st_mtime = times + self.sftp._request(CMD_FSETSTAT, self.handle, attr) + + def truncate(self, size): + """ + Change the size of this file. This usually extends + or shrinks the size of the file, just like the C{truncate()} method on + python file objects. + + @param size: the new size of the file + @type size: int or long + """ + self.sftp._log(DEBUG, 'truncate(%s, %r)' % (hexlify(self.handle), size)) + attr = SFTPAttributes() + attr.st_size = size + self.sftp._request(CMD_FSETSTAT, self.handle, attr) def check(self, hash_algorithm, offset=0, length=0, block_size=0): """ @@ -255,26 +373,60 @@ class SFTPFile (BufferedFile): dramatically improve the download speed by avoiding roundtrip latency. The file's contents are incrementally buffered in a background thread. + The prefetched data is stored in a buffer until read via the L{read} + method. Once data has been read, it's removed from the buffer. The + data may be read in a random order (using L{seek}); chunks of the + buffer that haven't been read will continue to be buffered. + @since: 1.5.1 """ size = self.stat().st_size # queue up async reads for the rest of the file - self._prefetching = True - self._prefetch_so_far = self._realpos - self._prefetch_size = size - self._prefetch_data = {} - t = threading.Thread(target=self._prefetch) - t.setDaemon(True) - t.start() - - def _prefetch(self): + chunks = [] n = self._realpos - size = self._prefetch_size while n < size: chunk = min(self.MAX_REQUEST_SIZE, size - n) - self.sftp._async_request(self, CMD_READ, self.handle, long(n), int(chunk)) + chunks.append((n, chunk)) n += chunk + if len(chunks) > 0: + self._start_prefetch(chunks) + + def readv(self, chunks): + """ + Read a set of blocks from the file by (offset, length). This is more + efficient than doing a series of L{seek} and L{read} calls, since the + prefetch machinery is used to retrieve all the requested blocks at + once. + + @param chunks: a list of (offset, length) tuples indicating which + sections of the file to read + @type chunks: list(tuple(long, int)) + @return: a list of blocks read, in the same order as in C{chunks} + @rtype: list(str) + + @since: 1.5.4 + """ + self.sftp._log(DEBUG, 'readv(%s, %r)' % (hexlify(self.handle), chunks)) + read_chunks = [] + for offset, size in chunks: + # don't fetch data that's already in the prefetch buffer + if self._data_in_prefetch_buffers(offset) or self._data_in_prefetch_requests(offset, size): + continue + + # break up anything larger than the max read size + while size > 0: + chunk_size = min(size, self.MAX_REQUEST_SIZE) + read_chunks.append((offset, chunk_size)) + offset += chunk_size + size -= chunk_size + + self._start_prefetch(read_chunks) + # now we can just devolve to a bunch of read()s :) + for x in chunks: + self.seek(x[0]) + yield self.read(x[1]) + ### internals... @@ -285,6 +437,21 @@ class SFTPFile (BufferedFile): except: return 0 + def _start_prefetch(self, chunks): + self._prefetching = True + self._prefetch_done = False + self._prefetch_reads.extend(chunks) + + t = threading.Thread(target=self._prefetch_thread, args=(chunks,)) + t.setDaemon(True) + t.start() + + def _prefetch_thread(self, chunks): + # do these read requests in a temporary thread because there may be + # a lot of them, so it may block. + for offset, length in chunks: + self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length)) + def _async_response(self, t, msg): if t == CMD_STATUS: # save exception and re-raise it on next file operation @@ -296,8 +463,10 @@ class SFTPFile (BufferedFile): if t != CMD_DATA: raise SFTPError('Expected data') data = msg.get_string() - self._prefetch_data[self._prefetch_so_far] = data - self._prefetch_so_far += len(data) + offset, length = self._prefetch_reads.pop(0) + self._prefetch_data[offset] = data + if len(self._prefetch_reads) == 0: + self._prefetch_done = True def _check_exception(self): "if there's a saved exception, raise & clear it" |