diff options
author | Jeremy T. Bouse <jbouse@debian.org> | 2009-11-27 16:20:12 -0500 |
---|---|---|
committer | Jeremy T. Bouse <jbouse@debian.org> | 2009-11-27 16:20:12 -0500 |
commit | ed280d5ac360e2af796e9bd973d7b4df89f0c449 (patch) | |
tree | ce892d6ce9dad8c0ecbc9cbe73f8095195bef0b4 /tests/test_sftp.py | |
parent | 176c6caf4ea7918e1698438634b237fab8456471 (diff) | |
download | python-paramiko-ed280d5ac360e2af796e9bd973d7b4df89f0c449.tar python-paramiko-ed280d5ac360e2af796e9bd973d7b4df89f0c449.tar.gz |
Imported Upstream version 1.7.4upstream/1.7.4
Diffstat (limited to 'tests/test_sftp.py')
-rwxr-xr-x[-rw-r--r--] | tests/test_sftp.py | 339 |
1 files changed, 139 insertions, 200 deletions
diff --git a/tests/test_sftp.py b/tests/test_sftp.py index 993899a..edc0599 100644..100755 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -1,4 +1,4 @@ -# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net> +# Copyright (C) 2003-2007 Robey Pointer <robey@lag.net> # # This file is part of paramiko. # @@ -23,9 +23,11 @@ a real actual sftp server is contacted, and a new folder is created there to do test file operations in (so no existing files will be harmed). """ +from binascii import hexlify import logging import os import random +import struct import sys import threading import time @@ -69,6 +71,11 @@ tc = None g_big_file_test = True +def get_sftp(): + global sftp + return sftp + + class SFTPTest (unittest.TestCase): def init(hostname, username, keyfile, passwd): @@ -273,28 +280,85 @@ class SFTPTest (unittest.TestCase): def test_8_setstat(self): """ - verify that the setstat functions (chown, chmod, utime) work. + verify that the setstat functions (chown, chmod, utime, truncate) work. """ f = sftp.open(FOLDER + '/special', 'w') try: + f.write('x' * 1024) f.close() stat = sftp.stat(FOLDER + '/special') sftp.chmod(FOLDER + '/special', (stat.st_mode & ~0777) | 0600) - self.assertEqual(sftp.stat(FOLDER + '/special').st_mode & 0777, 0600) + stat = sftp.stat(FOLDER + '/special') + expected_mode = 0600 + if sys.platform == 'win32': + # chmod not really functional on windows + expected_mode = 0666 + if sys.platform == 'cygwin': + # even worse. + expected_mode = 0644 + self.assertEqual(stat.st_mode & 0777, expected_mode) + self.assertEqual(stat.st_size, 1024) mtime = stat.st_mtime - 3600 atime = stat.st_atime - 1800 sftp.utime(FOLDER + '/special', (atime, mtime)) - nstat = sftp.stat(FOLDER + '/special') - self.assertEqual(nstat.st_mtime, mtime) - self.assertEqual(nstat.st_atime, atime) + stat = sftp.stat(FOLDER + '/special') + self.assertEqual(stat.st_mtime, mtime) + if sys.platform not in ('win32', 'cygwin'): + self.assertEqual(stat.st_atime, atime) # can't really test chown, since we'd have to know a valid uid. + + sftp.truncate(FOLDER + '/special', 512) + stat = sftp.stat(FOLDER + '/special') + self.assertEqual(stat.st_size, 512) finally: sftp.remove(FOLDER + '/special') - def test_9_readline_seek(self): + def test_9_fsetstat(self): + """ + verify that the fsetstat functions (chown, chmod, utime, truncate) + work on open files. + """ + f = sftp.open(FOLDER + '/special', 'w') + try: + f.write('x' * 1024) + f.close() + + f = sftp.open(FOLDER + '/special', 'r+') + stat = f.stat() + f.chmod((stat.st_mode & ~0777) | 0600) + stat = f.stat() + + expected_mode = 0600 + if sys.platform == 'win32': + # chmod not really functional on windows + expected_mode = 0666 + if sys.platform == 'cygwin': + # even worse. + expected_mode = 0644 + self.assertEqual(stat.st_mode & 0777, expected_mode) + self.assertEqual(stat.st_size, 1024) + + mtime = stat.st_mtime - 3600 + atime = stat.st_atime - 1800 + f.utime((atime, mtime)) + stat = f.stat() + self.assertEqual(stat.st_mtime, mtime) + if sys.platform not in ('win32', 'cygwin'): + self.assertEqual(stat.st_atime, atime) + + # can't really test chown, since we'd have to know a valid uid. + + f.truncate(512) + stat = f.stat() + self.assertEqual(stat.st_size, 512) + f.close() + finally: + sftp.remove(FOLDER + '/special') + + def test_A_readline_seek(self): """ create a text file and write a bunch of text into it. then count the lines in the file, and seek around to retreive particular lines. this should @@ -324,7 +388,7 @@ class SFTPTest (unittest.TestCase): finally: sftp.remove(FOLDER + '/duck.txt') - def test_A_write_seek(self): + def test_B_write_seek(self): """ create a text file, seek back and change part of it, and verify that the changes worked. @@ -344,10 +408,14 @@ class SFTPTest (unittest.TestCase): finally: sftp.remove(FOLDER + '/testing.txt') - def test_B_symlink(self): + def test_C_symlink(self): """ create a symlink and then check that lstat doesn't follow it. """ + if not hasattr(os, "symlink"): + # skip symlink tests on windows + return + f = sftp.open(FOLDER + '/original.txt', 'w') try: f.write('original\n') @@ -387,7 +455,7 @@ class SFTPTest (unittest.TestCase): except: pass - def test_C_flush_seek(self): + def test_D_flush_seek(self): """ verify that buffered writes are automatically flushed on seek. """ @@ -409,183 +477,7 @@ class SFTPTest (unittest.TestCase): except: pass - def test_D_lots_of_files(self): - """ - create a bunch of files over the same session. - """ - global g_big_file_test - if not g_big_file_test: - return - numfiles = 100 - try: - for i in range(numfiles): - f = sftp.open('%s/file%d.txt' % (FOLDER, i), 'w', 1) - f.write('this is file #%d.\n' % i) - f.close() - sftp.chmod('%s/file%d.txt' % (FOLDER, i), 0660) - - # now make sure every file is there, by creating a list of filenmes - # and reading them in random order. - numlist = range(numfiles) - while len(numlist) > 0: - r = numlist[random.randint(0, len(numlist) - 1)] - f = sftp.open('%s/file%d.txt' % (FOLDER, r)) - self.assertEqual(f.readline(), 'this is file #%d.\n' % r) - f.close() - numlist.remove(r) - finally: - for i in range(numfiles): - try: - sftp.remove('%s/file%d.txt' % (FOLDER, i)) - except: - pass - - def test_E_big_file(self): - """ - write a 1MB file with no buffering. - """ - global g_big_file_test - if not g_big_file_test: - return - kblob = (1024 * 'x') - start = time.time() - try: - f = sftp.open('%s/hongry.txt' % FOLDER, 'w') - for n in range(1024): - f.write(kblob) - if n % 128 == 0: - sys.stderr.write('.') - f.close() - sys.stderr.write(' ') - - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) - end = time.time() - sys.stderr.write('%ds ' % round(end - start)) - - start = time.time() - f = sftp.open('%s/hongry.txt' % FOLDER, 'r') - for n in range(1024): - data = f.read(1024) - self.assertEqual(data, kblob) - f.close() - - end = time.time() - sys.stderr.write('%ds ' % round(end - start)) - finally: - sftp.remove('%s/hongry.txt' % FOLDER) - - def test_F_big_file_pipelined(self): - """ - write a 1MB file, with no linefeeds, using pipelining. - """ - global g_big_file_test - if not g_big_file_test: - return - kblob = (1024 * 'x') - start = time.time() - try: - f = sftp.open('%s/hongry.txt' % FOLDER, 'w') - f.set_pipelined(True) - for n in range(1024): - f.write(kblob) - if n % 128 == 0: - sys.stderr.write('.') - f.close() - sys.stderr.write(' ') - - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) - end = time.time() - sys.stderr.write('%ds ' % round(end - start)) - - start = time.time() - f = sftp.open('%s/hongry.txt' % FOLDER, 'r') - f.prefetch() - for n in range(1024): - data = f.read(1024) - self.assertEqual(data, kblob) - f.close() - - end = time.time() - sys.stderr.write('%ds ' % round(end - start)) - finally: - sftp.remove('%s/hongry.txt' % FOLDER) - - def test_G_lots_of_prefetching(self): - """ - prefetch a 1MB file a bunch of times, discarding the file object - without using it, to verify that paramiko doesn't get confused. - """ - global g_big_file_test - if not g_big_file_test: - return - kblob = (1024 * 'x') - try: - f = sftp.open('%s/hongry.txt' % FOLDER, 'w') - f.set_pipelined(True) - for n in range(1024): - f.write(kblob) - if n % 128 == 0: - sys.stderr.write('.') - f.close() - sys.stderr.write(' ') - - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) - - for i in range(10): - f = sftp.open('%s/hongry.txt' % FOLDER, 'r') - f.prefetch() - f = sftp.open('%s/hongry.txt' % FOLDER, 'r') - f.prefetch() - for n in range(1024): - data = f.read(1024) - self.assertEqual(data, kblob) - if n % 128 == 0: - sys.stderr.write('.') - f.close() - sys.stderr.write(' ') - finally: - sftp.remove('%s/hongry.txt' % FOLDER) - - def test_H_big_file_big_buffer(self): - """ - write a 1MB file, with no linefeeds, and a big buffer. - """ - global g_big_file_test - if not g_big_file_test: - return - mblob = (1024 * 1024 * 'x') - try: - f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) - f.write(mblob) - f.close() - - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) - finally: - sftp.remove('%s/hongry.txt' % FOLDER) - - def test_I_big_file_renegotiate(self): - """ - write a 1MB file, forcing key renegotiation in the middle. - """ - global g_big_file_test - if not g_big_file_test: - return - t = sftp.sock.get_transport() - t.packetizer.REKEY_BYTES = 512 * 1024 - k32blob = (32 * 1024 * 'x') - try: - f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) - for i in xrange(32): - f.write(k32blob) - f.close() - - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) - self.assertNotEquals(t.H, t.session_id) - finally: - sftp.remove('%s/hongry.txt' % FOLDER) - t.packetizer.REKEY_BYTES = pow(2, 30) - - def test_J_realpath(self): + def test_E_realpath(self): """ test that realpath is returning something non-empty and not an error. @@ -596,7 +488,7 @@ class SFTPTest (unittest.TestCase): self.assert_(len(f) > 0) self.assertEquals(os.path.join(pwd, FOLDER), f) - def test_K_mkdir(self): + def test_F_mkdir(self): """ verify that mkdir/rmdir work. """ @@ -619,7 +511,7 @@ class SFTPTest (unittest.TestCase): except IOError: pass - def test_L_chdir(self): + def test_G_chdir(self): """ verify that chdir/getcwd work. """ @@ -656,7 +548,7 @@ class SFTPTest (unittest.TestCase): except: pass - def test_M_get_put(self): + def test_H_get_put(self): """ verify that get/put work. """ @@ -665,27 +557,33 @@ class SFTPTest (unittest.TestCase): localname = os.tempnam() text = 'All I wanted was a plastic bunny rabbit.\n' - f = open(localname, 'w') + f = open(localname, 'wb') f.write(text) f.close() - sftp.put(localname, FOLDER + '/bunny.txt') + saved_progress = [] + def progress_callback(x, y): + saved_progress.append((x, y)) + sftp.put(localname, FOLDER + '/bunny.txt', progress_callback) f = sftp.open(FOLDER + '/bunny.txt', 'r') self.assertEquals(text, f.read(128)) f.close() + self.assertEquals((41, 41), saved_progress[-1]) os.unlink(localname) localname = os.tempnam() - sftp.get(FOLDER + '/bunny.txt', localname) + saved_progress = [] + sftp.get(FOLDER + '/bunny.txt', localname, progress_callback) - f = open(localname, 'r') + f = open(localname, 'rb') self.assertEquals(text, f.read(128)) f.close() + self.assertEquals((41, 41), saved_progress[-1]) os.unlink(localname) sftp.unlink(FOLDER + '/bunny.txt') - def test_N_check(self): + def test_I_check(self): """ verify that file.check() works against our own server. (it's an sftp extension that we support, and may be the only ones who @@ -698,16 +596,17 @@ class SFTPTest (unittest.TestCase): try: f = sftp.open(FOLDER + '/kitty.txt', 'r') sum = f.check('sha1') - self.assertEquals('91059CFC6615941378D413CB5ADAF4C5EB293402', paramiko.util.hexify(sum)) + self.assertEquals('91059CFC6615941378D413CB5ADAF4C5EB293402', hexlify(sum).upper()) sum = f.check('md5', 0, 512) - self.assertEquals('93DE4788FCA28D471516963A1FE3856A', paramiko.util.hexify(sum)) + self.assertEquals('93DE4788FCA28D471516963A1FE3856A', hexlify(sum).upper()) sum = f.check('md5', 0, 0, 510) self.assertEquals('EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6', - paramiko.util.hexify(sum)) + hexlify(sum).upper()) + f.close() finally: sftp.unlink(FOLDER + '/kitty.txt') - def test_O_x_flag(self): + def test_J_x_flag(self): """ verify that the 'x' flag works when opening a file. """ @@ -723,7 +622,7 @@ class SFTPTest (unittest.TestCase): finally: sftp.unlink(FOLDER + '/unusual.txt') - def test_P_utf8(self): + def test_K_utf8(self): """ verify that unicode strings are encoded into utf8 correctly. """ @@ -738,3 +637,43 @@ class SFTPTest (unittest.TestCase): self.fail('exception ' + e) sftp.unlink(FOLDER + '/\xc3\xbcnic\xc3\xb8\x64\x65') + def test_L_bad_readv(self): + """ + verify that readv at the end of the file doesn't essplode. + """ + f = sftp.open(FOLDER + '/zero', 'w') + f.close() + try: + f = sftp.open(FOLDER + '/zero', 'r') + data = f.readv([(0, 12)]) + f.close() + + f = sftp.open(FOLDER + '/zero', 'r') + f.prefetch() + data = f.read(100) + f.close() + finally: + sftp.unlink(FOLDER + '/zero') + + def XXX_test_M_seek_append(self): + """ + verify that seek does't affect writes during append. + + does not work except through paramiko. :( openssh fails. + """ + f = sftp.open(FOLDER + '/append.txt', 'a') + try: + f.write('first line\nsecond line\n') + f.seek(11, f.SEEK_SET) + f.write('third line\n') + f.close() + + f = sftp.open(FOLDER + '/append.txt', 'r') + self.assertEqual(f.stat().st_size, 34) + self.assertEqual(f.readline(), 'first line\n') + self.assertEqual(f.readline(), 'second line\n') + self.assertEqual(f.readline(), 'third line\n') + f.close() + finally: + sftp.remove(FOLDER + '/append.txt') + |