aboutsummaryrefslogtreecommitdiff
path: root/tests/test_sftp.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_sftp.py')
-rwxr-xr-x[-rw-r--r--]tests/test_sftp.py339
1 files changed, 139 insertions, 200 deletions
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index 993899a..edc0599 100644..100755
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net>
+# Copyright (C) 2003-2007 Robey Pointer <robey@lag.net>
#
# This file is part of paramiko.
#
@@ -23,9 +23,11 @@ a real actual sftp server is contacted, and a new folder is created there to
do test file operations in (so no existing files will be harmed).
"""
+from binascii import hexlify
import logging
import os
import random
+import struct
import sys
import threading
import time
@@ -69,6 +71,11 @@ tc = None
g_big_file_test = True
+def get_sftp():
+ global sftp
+ return sftp
+
+
class SFTPTest (unittest.TestCase):
def init(hostname, username, keyfile, passwd):
@@ -273,28 +280,85 @@ class SFTPTest (unittest.TestCase):
def test_8_setstat(self):
"""
- verify that the setstat functions (chown, chmod, utime) work.
+ verify that the setstat functions (chown, chmod, utime, truncate) work.
"""
f = sftp.open(FOLDER + '/special', 'w')
try:
+ f.write('x' * 1024)
f.close()
stat = sftp.stat(FOLDER + '/special')
sftp.chmod(FOLDER + '/special', (stat.st_mode & ~0777) | 0600)
- self.assertEqual(sftp.stat(FOLDER + '/special').st_mode & 0777, 0600)
+ stat = sftp.stat(FOLDER + '/special')
+ expected_mode = 0600
+ if sys.platform == 'win32':
+ # chmod not really functional on windows
+ expected_mode = 0666
+ if sys.platform == 'cygwin':
+ # even worse.
+ expected_mode = 0644
+ self.assertEqual(stat.st_mode & 0777, expected_mode)
+ self.assertEqual(stat.st_size, 1024)
mtime = stat.st_mtime - 3600
atime = stat.st_atime - 1800
sftp.utime(FOLDER + '/special', (atime, mtime))
- nstat = sftp.stat(FOLDER + '/special')
- self.assertEqual(nstat.st_mtime, mtime)
- self.assertEqual(nstat.st_atime, atime)
+ stat = sftp.stat(FOLDER + '/special')
+ self.assertEqual(stat.st_mtime, mtime)
+ if sys.platform not in ('win32', 'cygwin'):
+ self.assertEqual(stat.st_atime, atime)
# can't really test chown, since we'd have to know a valid uid.
+
+ sftp.truncate(FOLDER + '/special', 512)
+ stat = sftp.stat(FOLDER + '/special')
+ self.assertEqual(stat.st_size, 512)
finally:
sftp.remove(FOLDER + '/special')
- def test_9_readline_seek(self):
+ def test_9_fsetstat(self):
+ """
+ verify that the fsetstat functions (chown, chmod, utime, truncate)
+ work on open files.
+ """
+ f = sftp.open(FOLDER + '/special', 'w')
+ try:
+ f.write('x' * 1024)
+ f.close()
+
+ f = sftp.open(FOLDER + '/special', 'r+')
+ stat = f.stat()
+ f.chmod((stat.st_mode & ~0777) | 0600)
+ stat = f.stat()
+
+ expected_mode = 0600
+ if sys.platform == 'win32':
+ # chmod not really functional on windows
+ expected_mode = 0666
+ if sys.platform == 'cygwin':
+ # even worse.
+ expected_mode = 0644
+ self.assertEqual(stat.st_mode & 0777, expected_mode)
+ self.assertEqual(stat.st_size, 1024)
+
+ mtime = stat.st_mtime - 3600
+ atime = stat.st_atime - 1800
+ f.utime((atime, mtime))
+ stat = f.stat()
+ self.assertEqual(stat.st_mtime, mtime)
+ if sys.platform not in ('win32', 'cygwin'):
+ self.assertEqual(stat.st_atime, atime)
+
+ # can't really test chown, since we'd have to know a valid uid.
+
+ f.truncate(512)
+ stat = f.stat()
+ self.assertEqual(stat.st_size, 512)
+ f.close()
+ finally:
+ sftp.remove(FOLDER + '/special')
+
+ def test_A_readline_seek(self):
"""
create a text file and write a bunch of text into it. then count the lines
in the file, and seek around to retreive particular lines. this should
@@ -324,7 +388,7 @@ class SFTPTest (unittest.TestCase):
finally:
sftp.remove(FOLDER + '/duck.txt')
- def test_A_write_seek(self):
+ def test_B_write_seek(self):
"""
create a text file, seek back and change part of it, and verify that the
changes worked.
@@ -344,10 +408,14 @@ class SFTPTest (unittest.TestCase):
finally:
sftp.remove(FOLDER + '/testing.txt')
- def test_B_symlink(self):
+ def test_C_symlink(self):
"""
create a symlink and then check that lstat doesn't follow it.
"""
+ if not hasattr(os, "symlink"):
+ # skip symlink tests on windows
+ return
+
f = sftp.open(FOLDER + '/original.txt', 'w')
try:
f.write('original\n')
@@ -387,7 +455,7 @@ class SFTPTest (unittest.TestCase):
except:
pass
- def test_C_flush_seek(self):
+ def test_D_flush_seek(self):
"""
verify that buffered writes are automatically flushed on seek.
"""
@@ -409,183 +477,7 @@ class SFTPTest (unittest.TestCase):
except:
pass
- def test_D_lots_of_files(self):
- """
- create a bunch of files over the same session.
- """
- global g_big_file_test
- if not g_big_file_test:
- return
- numfiles = 100
- try:
- for i in range(numfiles):
- f = sftp.open('%s/file%d.txt' % (FOLDER, i), 'w', 1)
- f.write('this is file #%d.\n' % i)
- f.close()
- sftp.chmod('%s/file%d.txt' % (FOLDER, i), 0660)
-
- # now make sure every file is there, by creating a list of filenmes
- # and reading them in random order.
- numlist = range(numfiles)
- while len(numlist) > 0:
- r = numlist[random.randint(0, len(numlist) - 1)]
- f = sftp.open('%s/file%d.txt' % (FOLDER, r))
- self.assertEqual(f.readline(), 'this is file #%d.\n' % r)
- f.close()
- numlist.remove(r)
- finally:
- for i in range(numfiles):
- try:
- sftp.remove('%s/file%d.txt' % (FOLDER, i))
- except:
- pass
-
- def test_E_big_file(self):
- """
- write a 1MB file with no buffering.
- """
- global g_big_file_test
- if not g_big_file_test:
- return
- kblob = (1024 * 'x')
- start = time.time()
- try:
- f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
- for n in range(1024):
- f.write(kblob)
- if n % 128 == 0:
- sys.stderr.write('.')
- f.close()
- sys.stderr.write(' ')
-
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
- end = time.time()
- sys.stderr.write('%ds ' % round(end - start))
-
- start = time.time()
- f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
- for n in range(1024):
- data = f.read(1024)
- self.assertEqual(data, kblob)
- f.close()
-
- end = time.time()
- sys.stderr.write('%ds ' % round(end - start))
- finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
-
- def test_F_big_file_pipelined(self):
- """
- write a 1MB file, with no linefeeds, using pipelining.
- """
- global g_big_file_test
- if not g_big_file_test:
- return
- kblob = (1024 * 'x')
- start = time.time()
- try:
- f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
- f.set_pipelined(True)
- for n in range(1024):
- f.write(kblob)
- if n % 128 == 0:
- sys.stderr.write('.')
- f.close()
- sys.stderr.write(' ')
-
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
- end = time.time()
- sys.stderr.write('%ds ' % round(end - start))
-
- start = time.time()
- f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
- f.prefetch()
- for n in range(1024):
- data = f.read(1024)
- self.assertEqual(data, kblob)
- f.close()
-
- end = time.time()
- sys.stderr.write('%ds ' % round(end - start))
- finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
-
- def test_G_lots_of_prefetching(self):
- """
- prefetch a 1MB file a bunch of times, discarding the file object
- without using it, to verify that paramiko doesn't get confused.
- """
- global g_big_file_test
- if not g_big_file_test:
- return
- kblob = (1024 * 'x')
- try:
- f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
- f.set_pipelined(True)
- for n in range(1024):
- f.write(kblob)
- if n % 128 == 0:
- sys.stderr.write('.')
- f.close()
- sys.stderr.write(' ')
-
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
-
- for i in range(10):
- f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
- f.prefetch()
- f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
- f.prefetch()
- for n in range(1024):
- data = f.read(1024)
- self.assertEqual(data, kblob)
- if n % 128 == 0:
- sys.stderr.write('.')
- f.close()
- sys.stderr.write(' ')
- finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
-
- def test_H_big_file_big_buffer(self):
- """
- write a 1MB file, with no linefeeds, and a big buffer.
- """
- global g_big_file_test
- if not g_big_file_test:
- return
- mblob = (1024 * 1024 * 'x')
- try:
- f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024)
- f.write(mblob)
- f.close()
-
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
- finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
-
- def test_I_big_file_renegotiate(self):
- """
- write a 1MB file, forcing key renegotiation in the middle.
- """
- global g_big_file_test
- if not g_big_file_test:
- return
- t = sftp.sock.get_transport()
- t.packetizer.REKEY_BYTES = 512 * 1024
- k32blob = (32 * 1024 * 'x')
- try:
- f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024)
- for i in xrange(32):
- f.write(k32blob)
- f.close()
-
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
- self.assertNotEquals(t.H, t.session_id)
- finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
- t.packetizer.REKEY_BYTES = pow(2, 30)
-
- def test_J_realpath(self):
+ def test_E_realpath(self):
"""
test that realpath is returning something non-empty and not an
error.
@@ -596,7 +488,7 @@ class SFTPTest (unittest.TestCase):
self.assert_(len(f) > 0)
self.assertEquals(os.path.join(pwd, FOLDER), f)
- def test_K_mkdir(self):
+ def test_F_mkdir(self):
"""
verify that mkdir/rmdir work.
"""
@@ -619,7 +511,7 @@ class SFTPTest (unittest.TestCase):
except IOError:
pass
- def test_L_chdir(self):
+ def test_G_chdir(self):
"""
verify that chdir/getcwd work.
"""
@@ -656,7 +548,7 @@ class SFTPTest (unittest.TestCase):
except:
pass
- def test_M_get_put(self):
+ def test_H_get_put(self):
"""
verify that get/put work.
"""
@@ -665,27 +557,33 @@ class SFTPTest (unittest.TestCase):
localname = os.tempnam()
text = 'All I wanted was a plastic bunny rabbit.\n'
- f = open(localname, 'w')
+ f = open(localname, 'wb')
f.write(text)
f.close()
- sftp.put(localname, FOLDER + '/bunny.txt')
+ saved_progress = []
+ def progress_callback(x, y):
+ saved_progress.append((x, y))
+ sftp.put(localname, FOLDER + '/bunny.txt', progress_callback)
f = sftp.open(FOLDER + '/bunny.txt', 'r')
self.assertEquals(text, f.read(128))
f.close()
+ self.assertEquals((41, 41), saved_progress[-1])
os.unlink(localname)
localname = os.tempnam()
- sftp.get(FOLDER + '/bunny.txt', localname)
+ saved_progress = []
+ sftp.get(FOLDER + '/bunny.txt', localname, progress_callback)
- f = open(localname, 'r')
+ f = open(localname, 'rb')
self.assertEquals(text, f.read(128))
f.close()
+ self.assertEquals((41, 41), saved_progress[-1])
os.unlink(localname)
sftp.unlink(FOLDER + '/bunny.txt')
- def test_N_check(self):
+ def test_I_check(self):
"""
verify that file.check() works against our own server.
(it's an sftp extension that we support, and may be the only ones who
@@ -698,16 +596,17 @@ class SFTPTest (unittest.TestCase):
try:
f = sftp.open(FOLDER + '/kitty.txt', 'r')
sum = f.check('sha1')
- self.assertEquals('91059CFC6615941378D413CB5ADAF4C5EB293402', paramiko.util.hexify(sum))
+ self.assertEquals('91059CFC6615941378D413CB5ADAF4C5EB293402', hexlify(sum).upper())
sum = f.check('md5', 0, 512)
- self.assertEquals('93DE4788FCA28D471516963A1FE3856A', paramiko.util.hexify(sum))
+ self.assertEquals('93DE4788FCA28D471516963A1FE3856A', hexlify(sum).upper())
sum = f.check('md5', 0, 0, 510)
self.assertEquals('EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6',
- paramiko.util.hexify(sum))
+ hexlify(sum).upper())
+ f.close()
finally:
sftp.unlink(FOLDER + '/kitty.txt')
- def test_O_x_flag(self):
+ def test_J_x_flag(self):
"""
verify that the 'x' flag works when opening a file.
"""
@@ -723,7 +622,7 @@ class SFTPTest (unittest.TestCase):
finally:
sftp.unlink(FOLDER + '/unusual.txt')
- def test_P_utf8(self):
+ def test_K_utf8(self):
"""
verify that unicode strings are encoded into utf8 correctly.
"""
@@ -738,3 +637,43 @@ class SFTPTest (unittest.TestCase):
self.fail('exception ' + e)
sftp.unlink(FOLDER + '/\xc3\xbcnic\xc3\xb8\x64\x65')
+ def test_L_bad_readv(self):
+ """
+ verify that readv at the end of the file doesn't essplode.
+ """
+ f = sftp.open(FOLDER + '/zero', 'w')
+ f.close()
+ try:
+ f = sftp.open(FOLDER + '/zero', 'r')
+ data = f.readv([(0, 12)])
+ f.close()
+
+ f = sftp.open(FOLDER + '/zero', 'r')
+ f.prefetch()
+ data = f.read(100)
+ f.close()
+ finally:
+ sftp.unlink(FOLDER + '/zero')
+
+ def XXX_test_M_seek_append(self):
+ """
+ verify that seek does't affect writes during append.
+
+ does not work except through paramiko. :( openssh fails.
+ """
+ f = sftp.open(FOLDER + '/append.txt', 'a')
+ try:
+ f.write('first line\nsecond line\n')
+ f.seek(11, f.SEEK_SET)
+ f.write('third line\n')
+ f.close()
+
+ f = sftp.open(FOLDER + '/append.txt', 'r')
+ self.assertEqual(f.stat().st_size, 34)
+ self.assertEqual(f.readline(), 'first line\n')
+ self.assertEqual(f.readline(), 'second line\n')
+ self.assertEqual(f.readline(), 'third line\n')
+ f.close()
+ finally:
+ sftp.remove(FOLDER + '/append.txt')
+