Package paramiko :: Module sftp_file
[frames] | no frames]

Source Code for Module paramiko.sftp_file

  1  # Copyright (C) 2003-2007  Robey Pointer <robey@lag.net> 
  2  # 
  3  # This file is part of paramiko. 
  4  # 
  5  # Paramiko is free software; you can redistribute it and/or modify it under the 
  6  # terms of the GNU Lesser General Public License as published by the Free 
  7  # Software Foundation; either version 2.1 of the License, or (at your option) 
  8  # any later version. 
  9  # 
 10  # Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY 
 11  # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 
 12  # A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
 13  # details. 
 14  # 
 15  # You should have received a copy of the GNU Lesser General Public License 
 16  # along with Paramiko; if not, write to the Free Software Foundation, Inc., 
 17  # 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA. 
 18   
 19  """ 
 20  L{SFTPFile} 
 21  """ 
 22   
 23  from binascii import hexlify 
 24  import socket 
 25  import threading 
 26  import time 
 27   
 28  from paramiko.common import * 
 29  from paramiko.sftp import * 
 30  from paramiko.file import BufferedFile 
 31  from paramiko.sftp_attr import SFTPAttributes 
 32   
 33   
34 -class SFTPFile (BufferedFile):
35 """ 36 Proxy object for a file on the remote server, in client mode SFTP. 37 """ 38 39 # Some sftp servers will choke if you send read/write requests larger than 40 # this size. 41 MAX_REQUEST_SIZE = 32768 42
43 - def __init__(self, sftp, handle, mode='r', bufsize=-1):
44 BufferedFile.__init__(self) 45 self.sftp = sftp 46 self.handle = handle 47 BufferedFile._set_mode(self, mode, bufsize) 48 self.pipelined = False 49 self._prefetching = False 50 self._prefetch_done = False 51 self._prefetch_data = {} 52 self._prefetch_reads = [] 53 self._saved_exception = None
54
55 - def __del__(self):
56 self._close(async=True)
57
58 - def close(self):
59 self._close(async=False)
60
61 - def _close(self, async=False):
62 # We allow double-close without signaling an error, because real 63 # Python file objects do. However, we must protect against actually 64 # sending multiple CMD_CLOSE packets, because after we close our 65 # handle, the same handle may be re-allocated by the server, and we 66 # may end up mysteriously closing some random other file. (This is 67 # especially important because we unconditionally call close() from 68 # __del__.) 69 if self._closed: 70 return 71 self.sftp._log(DEBUG, 'close(%s)' % hexlify(self.handle)) 72 if self.pipelined: 73 self.sftp._finish_responses(self) 74 BufferedFile.close(self) 75 try: 76 if async: 77 # GC'd file handle could be called from an arbitrary thread -- don't wait for a response 78 self.sftp._async_request(type(None), CMD_CLOSE, self.handle) 79 else: 80 self.sftp._request(CMD_CLOSE, self.handle) 81 except EOFError: 82 # may have outlived the Transport connection 83 pass 84 except (IOError, socket.error): 85 # may have outlived the Transport connection 86 pass
87
88 - def _data_in_prefetch_requests(self, offset, size):
89 k = [i for i in self._prefetch_reads if i[0] <= offset] 90 if len(k) == 0: 91 return False 92 k.sort(lambda x, y: cmp(x[0], y[0])) 93 buf_offset, buf_size = k[-1] 94 if buf_offset + buf_size <= offset: 95 # prefetch request ends before this one begins 96 return False 97 if buf_offset + buf_size >= offset + size: 98 # inclusive 99 return True 100 # well, we have part of the request. see if another chunk has the rest. 101 return self._data_in_prefetch_requests(buf_offset + buf_size, offset + size - buf_offset - buf_size)
102
103 - def _data_in_prefetch_buffers(self, offset):
104 """ 105 if a block of data is present in the prefetch buffers, at the given 106 offset, return the offset of the relevant prefetch buffer. otherwise, 107 return None. this guarantees nothing about the number of bytes 108 collected in the prefetch buffer so far. 109 """ 110 k = [i for i in self._prefetch_data.keys() if i <= offset] 111 if len(k) == 0: 112 return None 113 index = max(k) 114 buf_offset = offset - index 115 if buf_offset >= len(self._prefetch_data[index]): 116 # it's not here 117 return None 118 return index
119
120 - def _read_prefetch(self, size):
121 """ 122 read data out of the prefetch buffer, if possible. if the data isn't 123 in the buffer, return None. otherwise, behaves like a normal read. 124 """ 125 # while not closed, and haven't fetched past the current position, and haven't reached EOF... 126 while True: 127 offset = self._data_in_prefetch_buffers(self._realpos) 128 if offset is not None: 129 break 130 if self._prefetch_done or self._closed: 131 break 132 self.sftp._read_response() 133 self._check_exception() 134 if offset is None: 135 self._prefetching = False 136 return None 137 prefetch = self._prefetch_data[offset] 138 del self._prefetch_data[offset] 139 140 buf_offset = self._realpos - offset 141 if buf_offset > 0: 142 self._prefetch_data[offset] = prefetch[:buf_offset] 143 prefetch = prefetch[buf_offset:] 144 if size < len(prefetch): 145 self._prefetch_data[self._realpos + size] = prefetch[size:] 146 prefetch = prefetch[:size] 147 return prefetch
148
149 - def _read(self, size):
150 size = min(size, self.MAX_REQUEST_SIZE) 151 if self._prefetching: 152 data = self._read_prefetch(size) 153 if data is not None: 154 return data 155 t, msg = self.sftp._request(CMD_READ, self.handle, long(self._realpos), int(size)) 156 if t != CMD_DATA: 157 raise SFTPError('Expected data') 158 return msg.get_string()
159
160 - def _write(self, data):
161 # may write less than requested if it would exceed max packet size 162 chunk = min(len(data), self.MAX_REQUEST_SIZE) 163 req = self.sftp._async_request(type(None), CMD_WRITE, self.handle, long(self._realpos), str(data[:chunk])) 164 if not self.pipelined or self.sftp.sock.recv_ready(): 165 t, msg = self.sftp._read_response(req) 166 if t != CMD_STATUS: 167 raise SFTPError('Expected status') 168 # convert_status already called 169 return chunk
170
171 - def settimeout(self, timeout):
172 """ 173 Set a timeout on read/write operations on the underlying socket or 174 ssh L{Channel}. 175 176 @see: L{Channel.settimeout} 177 @param timeout: seconds to wait for a pending read/write operation 178 before raising C{socket.timeout}, or C{None} for no timeout 179 @type timeout: float 180 """ 181 self.sftp.sock.settimeout(timeout)
182
183 - def gettimeout(self):
184 """ 185 Returns the timeout in seconds (as a float) associated with the socket 186 or ssh L{Channel} used for this file. 187 188 @see: L{Channel.gettimeout} 189 @rtype: float 190 """ 191 return self.sftp.sock.gettimeout()
192
193 - def setblocking(self, blocking):
194 """ 195 Set blocking or non-blocking mode on the underiying socket or ssh 196 L{Channel}. 197 198 @see: L{Channel.setblocking} 199 @param blocking: 0 to set non-blocking mode; non-0 to set blocking 200 mode. 201 @type blocking: int 202 """ 203 self.sftp.sock.setblocking(blocking)
204
205 - def seek(self, offset, whence=0):
206 self.flush() 207 if whence == self.SEEK_SET: 208 self._realpos = self._pos = offset 209 elif whence == self.SEEK_CUR: 210 self._pos += offset 211 self._realpos = self._pos 212 else: 213 self._realpos = self._pos = self._get_size() + offset 214 self._rbuffer = ''
215
216 - def stat(self):
217 """ 218 Retrieve information about this file from the remote system. This is 219 exactly like L{SFTP.stat}, except that it operates on an already-open 220 file. 221 222 @return: an object containing attributes about this file. 223 @rtype: SFTPAttributes 224 """ 225 t, msg = self.sftp._request(CMD_FSTAT, self.handle) 226 if t != CMD_ATTRS: 227 raise SFTPError('Expected attributes') 228 return SFTPAttributes._from_msg(msg)
229
230 - def chmod(self, mode):
231 """ 232 Change the mode (permissions) of this file. The permissions are 233 unix-style and identical to those used by python's C{os.chmod} 234 function. 235 236 @param mode: new permissions 237 @type mode: int 238 """ 239 self.sftp._log(DEBUG, 'chmod(%s, %r)' % (hexlify(self.handle), mode)) 240 attr = SFTPAttributes() 241 attr.st_mode = mode 242 self.sftp._request(CMD_FSETSTAT, self.handle, attr)
243
244 - def chown(self, uid, gid):
245 """ 246 Change the owner (C{uid}) and group (C{gid}) of this file. As with 247 python's C{os.chown} function, you must pass both arguments, so if you 248 only want to change one, use L{stat} first to retrieve the current 249 owner and group. 250 251 @param uid: new owner's uid 252 @type uid: int 253 @param gid: new group id 254 @type gid: int 255 """ 256 self.sftp._log(DEBUG, 'chown(%s, %r, %r)' % (hexlify(self.handle), uid, gid)) 257 attr = SFTPAttributes() 258 attr.st_uid, attr.st_gid = uid, gid 259 self.sftp._request(CMD_FSETSTAT, self.handle, attr)
260
261 - def utime(self, times):
262 """ 263 Set the access and modified times of this file. If 264 C{times} is C{None}, then the file's access and modified times are set 265 to the current time. Otherwise, C{times} must be a 2-tuple of numbers, 266 of the form C{(atime, mtime)}, which is used to set the access and 267 modified times, respectively. This bizarre API is mimicked from python 268 for the sake of consistency -- I apologize. 269 270 @param times: C{None} or a tuple of (access time, modified time) in 271 standard internet epoch time (seconds since 01 January 1970 GMT) 272 @type times: tuple(int) 273 """ 274 if times is None: 275 times = (time.time(), time.time()) 276 self.sftp._log(DEBUG, 'utime(%s, %r)' % (hexlify(self.handle), times)) 277 attr = SFTPAttributes() 278 attr.st_atime, attr.st_mtime = times 279 self.sftp._request(CMD_FSETSTAT, self.handle, attr)
280
281 - def truncate(self, size):
282 """ 283 Change the size of this file. This usually extends 284 or shrinks the size of the file, just like the C{truncate()} method on 285 python file objects. 286 287 @param size: the new size of the file 288 @type size: int or long 289 """ 290 self.sftp._log(DEBUG, 'truncate(%s, %r)' % (hexlify(self.handle), size)) 291 attr = SFTPAttributes() 292 attr.st_size = size 293 self.sftp._request(CMD_FSETSTAT, self.handle, attr)
294
295 - def check(self, hash_algorithm, offset=0, length=0, block_size=0):
296 """ 297 Ask the server for a hash of a section of this file. This can be used 298 to verify a successful upload or download, or for various rsync-like 299 operations. 300 301 The file is hashed from C{offset}, for C{length} bytes. If C{length} 302 is 0, the remainder of the file is hashed. Thus, if both C{offset} 303 and C{length} are zero, the entire file is hashed. 304 305 Normally, C{block_size} will be 0 (the default), and this method will 306 return a byte string representing the requested hash (for example, a 307 string of length 16 for MD5, or 20 for SHA-1). If a non-zero 308 C{block_size} is given, each chunk of the file (from C{offset} to 309 C{offset + length}) of C{block_size} bytes is computed as a separate 310 hash. The hash results are all concatenated and returned as a single 311 string. 312 313 For example, C{check('sha1', 0, 1024, 512)} will return a string of 314 length 40. The first 20 bytes will be the SHA-1 of the first 512 bytes 315 of the file, and the last 20 bytes will be the SHA-1 of the next 512 316 bytes. 317 318 @param hash_algorithm: the name of the hash algorithm to use (normally 319 C{"sha1"} or C{"md5"}) 320 @type hash_algorithm: str 321 @param offset: offset into the file to begin hashing (0 means to start 322 from the beginning) 323 @type offset: int or long 324 @param length: number of bytes to hash (0 means continue to the end of 325 the file) 326 @type length: int or long 327 @param block_size: number of bytes to hash per result (must not be less 328 than 256; 0 means to compute only one hash of the entire segment) 329 @type block_size: int 330 @return: string of bytes representing the hash of each block, 331 concatenated together 332 @rtype: str 333 334 @note: Many (most?) servers don't support this extension yet. 335 336 @raise IOError: if the server doesn't support the "check-file" 337 extension, or possibly doesn't support the hash algorithm 338 requested 339 340 @since: 1.4 341 """ 342 t, msg = self.sftp._request(CMD_EXTENDED, 'check-file', self.handle, 343 hash_algorithm, long(offset), long(length), block_size) 344 ext = msg.get_string() 345 alg = msg.get_string() 346 data = msg.get_remainder() 347 return data
348
349 - def set_pipelined(self, pipelined=True):
350 """ 351 Turn on/off the pipelining of write operations to this file. When 352 pipelining is on, paramiko won't wait for the server response after 353 each write operation. Instead, they're collected as they come in. 354 At the first non-write operation (including L{close}), all remaining 355 server responses are collected. This means that if there was an error 356 with one of your later writes, an exception might be thrown from 357 within L{close} instead of L{write}. 358 359 By default, files are I{not} pipelined. 360 361 @param pipelined: C{True} if pipelining should be turned on for this 362 file; C{False} otherwise 363 @type pipelined: bool 364 365 @since: 1.5 366 """ 367 self.pipelined = pipelined
368
369 - def prefetch(self):
370 """ 371 Pre-fetch the remaining contents of this file in anticipation of 372 future L{read} calls. If reading the entire file, pre-fetching can 373 dramatically improve the download speed by avoiding roundtrip latency. 374 The file's contents are incrementally buffered in a background thread. 375 376 The prefetched data is stored in a buffer until read via the L{read} 377 method. Once data has been read, it's removed from the buffer. The 378 data may be read in a random order (using L{seek}); chunks of the 379 buffer that haven't been read will continue to be buffered. 380 381 @since: 1.5.1 382 """ 383 size = self.stat().st_size 384 # queue up async reads for the rest of the file 385 chunks = [] 386 n = self._realpos 387 while n < size: 388 chunk = min(self.MAX_REQUEST_SIZE, size - n) 389 chunks.append((n, chunk)) 390 n += chunk 391 if len(chunks) > 0: 392 self._start_prefetch(chunks)
393
394 - def readv(self, chunks):
395 """ 396 Read a set of blocks from the file by (offset, length). This is more 397 efficient than doing a series of L{seek} and L{read} calls, since the 398 prefetch machinery is used to retrieve all the requested blocks at 399 once. 400 401 @param chunks: a list of (offset, length) tuples indicating which 402 sections of the file to read 403 @type chunks: list(tuple(long, int)) 404 @return: a list of blocks read, in the same order as in C{chunks} 405 @rtype: list(str) 406 407 @since: 1.5.4 408 """ 409 self.sftp._log(DEBUG, 'readv(%s, %r)' % (hexlify(self.handle), chunks)) 410 411 read_chunks = [] 412 for offset, size in chunks: 413 # don't fetch data that's already in the prefetch buffer 414 if self._data_in_prefetch_buffers(offset) or self._data_in_prefetch_requests(offset, size): 415 continue 416 417 # break up anything larger than the max read size 418 while size > 0: 419 chunk_size = min(size, self.MAX_REQUEST_SIZE) 420 read_chunks.append((offset, chunk_size)) 421 offset += chunk_size 422 size -= chunk_size 423 424 self._start_prefetch(read_chunks) 425 # now we can just devolve to a bunch of read()s :) 426 for x in chunks: 427 self.seek(x[0]) 428 yield self.read(x[1])
429 430 431 ### internals... 432 433
434 - def _get_size(self):
435 try: 436 return self.stat().st_size 437 except: 438 return 0
439
440 - def _start_prefetch(self, chunks):
441 self._prefetching = True 442 self._prefetch_done = False 443 self._prefetch_reads.extend(chunks) 444 445 t = threading.Thread(target=self._prefetch_thread, args=(chunks,)) 446 t.setDaemon(True) 447 t.start()
448
449 - def _prefetch_thread(self, chunks):
450 # do these read requests in a temporary thread because there may be 451 # a lot of them, so it may block. 452 for offset, length in chunks: 453 self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length))
454
455 - def _async_response(self, t, msg):
456 if t == CMD_STATUS: 457 # save exception and re-raise it on next file operation 458 try: 459 self.sftp._convert_status(msg) 460 except Exception, x: 461 self._saved_exception = x 462 return 463 if t != CMD_DATA: 464 raise SFTPError('Expected data') 465 data = msg.get_string() 466 offset, length = self._prefetch_reads.pop(0) 467 self._prefetch_data[offset] = data 468 if len(self._prefetch_reads) == 0: 469 self._prefetch_done = True
470
471 - def _check_exception(self):
472 "if there's a saved exception, raise & clear it" 473 if self._saved_exception is not None: 474 x = self._saved_exception 475 self._saved_exception = None 476 raise x
477