diff options
Diffstat (limited to 'tests/stub_sftp.py')
-rw-r--r-- | tests/stub_sftp.py | 192 |
1 files changed, 192 insertions, 0 deletions
diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py new file mode 100644 index 0000000..4b8b9c3 --- /dev/null +++ b/tests/stub_sftp.py @@ -0,0 +1,192 @@ +#!/usr/bin/python + +# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net> +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +A stub SFTP server for loopback SFTP testing. +""" + +import os +from paramiko import ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes, \ + SFTPHandle, SFTP_OK, AUTH_SUCCESSFUL, OPEN_SUCCEEDED + + +class StubServer (ServerInterface): + def check_auth_password(self, username, password): + # all are allowed + return AUTH_SUCCESSFUL + + def check_channel_request(self, kind, chanid): + return OPEN_SUCCEEDED + + +class StubSFTPHandle (SFTPHandle): + def stat(self): + try: + return SFTPAttributes.from_stat(os.fstat(self.readfile.fileno())) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + + def chattr(self, attr): + # python doesn't have equivalents to fchown or fchmod, so we have to + # use the stored filename + try: + SFTPServer.set_file_attr(self.filename, attr) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + + +class StubSFTPServer (SFTPServerInterface): + # assume current folder is a fine root + # (the tests always create and eventualy delete a subfolder, so there shouldn't be any mess) + ROOT = os.getcwd() + + def _realpath(self, path): + return self.ROOT + self.canonicalize(path) + + def list_folder(self, path): + path = self._realpath(path) + try: + out = [ ] + flist = os.listdir(path) + for fname in flist: + attr = SFTPAttributes.from_stat(os.stat(os.path.join(path, fname))) + attr.filename = fname + out.append(attr) + return out + except OSError, e: + return SFTPServer.convert_errno(e.errno) + + def stat(self, path): + path = self._realpath(path) + try: + return SFTPAttributes.from_stat(os.stat(path)) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + + def lstat(self, path): + path = self._realpath(path) + try: + return SFTPAttributes.from_stat(os.lstat(path)) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + + def open(self, path, flags, attr): + path = self._realpath(path) + try: + fd = os.open(path, flags) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + if (flags & os.O_CREAT) and (attr is not None): + SFTPServer.set_file_attr(path, attr) + if flags & os.O_WRONLY: + fstr = 'w' + elif flags & os.O_RDWR: + fstr = 'r+' + else: + # O_RDONLY (== 0) + fstr = 'r' + try: + f = os.fdopen(fd, fstr) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + fobj = StubSFTPHandle() + fobj.filename = path + fobj.readfile = f + fobj.writefile = f + return fobj + + def remove(self, path): + path = self._realpath(path) + try: + os.remove(path) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + return SFTP_OK + + def rename(self, oldpath, newpath): + oldpath = self._realpath(oldpath) + newpath = self._realpath(newpath) + try: + os.rename(oldpath, newpath) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + return SFTP_OK + + def mkdir(self, path, attr): + path = self._realpath(path) + try: + os.mkdir(path) + if attr is not None: + SFTPServer.set_file_attr(path, attr) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + return SFTP_OK + + def rmdir(self, path): + path = self._realpath(path) + try: + os.rmdir(path) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + return SFTP_OK + + def chattr(self, path, attr): + path = self._realpath(path) + try: + SFTPServer.set_file_attr(path, attr) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + return SFTP_OK + + def symlink(self, target_path, path): + path = self._realpath(path) + if (len(target_path) > 0) and (target_path[0] == '/'): + # absolute symlink + target_path = os.path.join(self.ROOT, target_path[1:]) + if target_path[:2] == '//': + # bug in os.path.join + target_path = target_path[1:] + else: + # compute relative to path + abspath = os.path.join(os.path.dirname(path), target_path) + if abspath[:len(self.ROOT)] != self.ROOT: + # this symlink isn't going to work anyway -- just break it immediately + target_path = '<error>' + try: + os.symlink(target_path, path) + except: + return SFTPServer.convert_errno(e.errno) + return SFTP_OK + + def readlink(self, path): + path = self._realpath(path) + try: + symlink = os.readlink(path) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + # if it's absolute, remove the root + if os.path.isabs(symlink): + if symlink[:len(self.ROOT)] == self.ROOT: + symlink = symlink[len(self.ROOT):] + if (len(symlink) == 0) or (symlink[0] != '/'): + symlink = '/' + symlink + else: + symlink = '<error>' + return symlink |