Package paramiko :: Module channel
[frames] | no frames]

Source Code for Module paramiko.channel

   1  # Copyright (C) 2003-2007  Robey Pointer <robey@lag.net> 
   2  # 
   3  # This file is part of paramiko. 
   4  # 
   5  # Paramiko is free software; you can redistribute it and/or modify it under the 
   6  # terms of the GNU Lesser General Public License as published by the Free 
   7  # Software Foundation; either version 2.1 of the License, or (at your option) 
   8  # any later version. 
   9  # 
  10  # Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY 
  11  # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 
  12  # A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
  13  # details. 
  14  # 
  15  # You should have received a copy of the GNU Lesser General Public License 
  16  # along with Paramiko; if not, write to the Free Software Foundation, Inc., 
  17  # 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA. 
  18   
  19  """ 
  20  Abstraction for an SSH2 channel. 
  21  """ 
  22   
  23  import binascii 
  24  import sys 
  25  import time 
  26  import threading 
  27  import socket 
  28  import os 
  29   
  30  from paramiko.common import * 
  31  from paramiko import util 
  32  from paramiko.message import Message 
  33  from paramiko.ssh_exception import SSHException 
  34  from paramiko.file import BufferedFile 
  35  from paramiko.buffered_pipe import BufferedPipe, PipeTimeout 
  36  from paramiko import pipe 
  37   
  38   
  39  # lower bound on the max packet size we'll accept from the remote host 
  40  MIN_PACKET_SIZE = 1024 
  41   
  42   
43 -class Channel (object):
44 """ 45 A secure tunnel across an SSH L{Transport}. A Channel is meant to behave 46 like a socket, and has an API that should be indistinguishable from the 47 python socket API. 48 49 Because SSH2 has a windowing kind of flow control, if you stop reading data 50 from a Channel and its buffer fills up, the server will be unable to send 51 you any more data until you read some of it. (This won't affect other 52 channels on the same transport -- all channels on a single transport are 53 flow-controlled independently.) Similarly, if the server isn't reading 54 data you send, calls to L{send} may block, unless you set a timeout. This 55 is exactly like a normal network socket, so it shouldn't be too surprising. 56 """ 57
58 - def __init__(self, chanid):
59 """ 60 Create a new channel. The channel is not associated with any 61 particular session or L{Transport} until the Transport attaches it. 62 Normally you would only call this method from the constructor of a 63 subclass of L{Channel}. 64 65 @param chanid: the ID of this channel, as passed by an existing 66 L{Transport}. 67 @type chanid: int 68 """ 69 self.chanid = chanid 70 self.remote_chanid = 0 71 self.transport = None 72 self.active = False 73 self.eof_received = 0 74 self.eof_sent = 0 75 self.in_buffer = BufferedPipe() 76 self.in_stderr_buffer = BufferedPipe() 77 self.timeout = None 78 self.closed = False 79 self.ultra_debug = False 80 self.lock = threading.Lock() 81 self.out_buffer_cv = threading.Condition(self.lock) 82 self.in_window_size = 0 83 self.out_window_size = 0 84 self.in_max_packet_size = 0 85 self.out_max_packet_size = 0 86 self.in_window_threshold = 0 87 self.in_window_sofar = 0 88 self.status_event = threading.Event() 89 self._name = str(chanid) 90 self.logger = util.get_logger('paramiko.transport') 91 self._pipe = None 92 self.event = threading.Event() 93 self.combine_stderr = False 94 self.exit_status = -1 95 self.origin_addr = None
96
97 - def __del__(self):
98 try: 99 self.close() 100 except: 101 pass
102
103 - def __repr__(self):
104 """ 105 Return a string representation of this object, for debugging. 106 107 @rtype: str 108 """ 109 out = '<paramiko.Channel %d' % self.chanid 110 if self.closed: 111 out += ' (closed)' 112 elif self.active: 113 if self.eof_received: 114 out += ' (EOF received)' 115 if self.eof_sent: 116 out += ' (EOF sent)' 117 out += ' (open) window=%d' % (self.out_window_size) 118 if len(self.in_buffer) > 0: 119 out += ' in-buffer=%d' % (len(self.in_buffer),) 120 out += ' -> ' + repr(self.transport) 121 out += '>' 122 return out
123
124 - def get_pty(self, term='vt100', width=80, height=24):
125 """ 126 Request a pseudo-terminal from the server. This is usually used right 127 after creating a client channel, to ask the server to provide some 128 basic terminal semantics for a shell invoked with L{invoke_shell}. 129 It isn't necessary (or desirable) to call this method if you're going 130 to exectue a single command with L{exec_command}. 131 132 @param term: the terminal type to emulate (for example, C{'vt100'}) 133 @type term: str 134 @param width: width (in characters) of the terminal screen 135 @type width: int 136 @param height: height (in characters) of the terminal screen 137 @type height: int 138 139 @raise SSHException: if the request was rejected or the channel was 140 closed 141 """ 142 if self.closed or self.eof_received or self.eof_sent or not self.active: 143 raise SSHException('Channel is not open') 144 m = Message() 145 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 146 m.add_int(self.remote_chanid) 147 m.add_string('pty-req') 148 m.add_boolean(True) 149 m.add_string(term) 150 m.add_int(width) 151 m.add_int(height) 152 # pixel height, width (usually useless) 153 m.add_int(0).add_int(0) 154 m.add_string('') 155 self.event.clear() 156 self.transport._send_user_message(m) 157 self._wait_for_event()
158
159 - def invoke_shell(self):
160 """ 161 Request an interactive shell session on this channel. If the server 162 allows it, the channel will then be directly connected to the stdin, 163 stdout, and stderr of the shell. 164 165 Normally you would call L{get_pty} before this, in which case the 166 shell will operate through the pty, and the channel will be connected 167 to the stdin and stdout of the pty. 168 169 When the shell exits, the channel will be closed and can't be reused. 170 You must open a new channel if you wish to open another shell. 171 172 @raise SSHException: if the request was rejected or the channel was 173 closed 174 """ 175 if self.closed or self.eof_received or self.eof_sent or not self.active: 176 raise SSHException('Channel is not open') 177 m = Message() 178 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 179 m.add_int(self.remote_chanid) 180 m.add_string('shell') 181 m.add_boolean(1) 182 self.event.clear() 183 self.transport._send_user_message(m) 184 self._wait_for_event()
185
186 - def exec_command(self, command):
187 """ 188 Execute a command on the server. If the server allows it, the channel 189 will then be directly connected to the stdin, stdout, and stderr of 190 the command being executed. 191 192 When the command finishes executing, the channel will be closed and 193 can't be reused. You must open a new channel if you wish to execute 194 another command. 195 196 @param command: a shell command to execute. 197 @type command: str 198 199 @raise SSHException: if the request was rejected or the channel was 200 closed 201 """ 202 if self.closed or self.eof_received or self.eof_sent or not self.active: 203 raise SSHException('Channel is not open') 204 m = Message() 205 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 206 m.add_int(self.remote_chanid) 207 m.add_string('exec') 208 m.add_boolean(True) 209 m.add_string(command) 210 self.event.clear() 211 self.transport._send_user_message(m) 212 self._wait_for_event()
213
214 - def invoke_subsystem(self, subsystem):
215 """ 216 Request a subsystem on the server (for example, C{sftp}). If the 217 server allows it, the channel will then be directly connected to the 218 requested subsystem. 219 220 When the subsystem finishes, the channel will be closed and can't be 221 reused. 222 223 @param subsystem: name of the subsystem being requested. 224 @type subsystem: str 225 226 @raise SSHException: if the request was rejected or the channel was 227 closed 228 """ 229 if self.closed or self.eof_received or self.eof_sent or not self.active: 230 raise SSHException('Channel is not open') 231 m = Message() 232 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 233 m.add_int(self.remote_chanid) 234 m.add_string('subsystem') 235 m.add_boolean(True) 236 m.add_string(subsystem) 237 self.event.clear() 238 self.transport._send_user_message(m) 239 self._wait_for_event()
240
241 - def resize_pty(self, width=80, height=24):
242 """ 243 Resize the pseudo-terminal. This can be used to change the width and 244 height of the terminal emulation created in a previous L{get_pty} call. 245 246 @param width: new width (in characters) of the terminal screen 247 @type width: int 248 @param height: new height (in characters) of the terminal screen 249 @type height: int 250 251 @raise SSHException: if the request was rejected or the channel was 252 closed 253 """ 254 if self.closed or self.eof_received or self.eof_sent or not self.active: 255 raise SSHException('Channel is not open') 256 m = Message() 257 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 258 m.add_int(self.remote_chanid) 259 m.add_string('window-change') 260 m.add_boolean(True) 261 m.add_int(width) 262 m.add_int(height) 263 m.add_int(0).add_int(0) 264 self.event.clear() 265 self.transport._send_user_message(m) 266 self._wait_for_event()
267
268 - def exit_status_ready(self):
269 """ 270 Return true if the remote process has exited and returned an exit 271 status. You may use this to poll the process status if you don't 272 want to block in L{recv_exit_status}. Note that the server may not 273 return an exit status in some cases (like bad servers). 274 275 @return: True if L{recv_exit_status} will return immediately 276 @rtype: bool 277 @since: 1.7.3 278 """ 279 return self.closed or self.status_event.isSet()
280
281 - def recv_exit_status(self):
282 """ 283 Return the exit status from the process on the server. This is 284 mostly useful for retrieving the reults of an L{exec_command}. 285 If the command hasn't finished yet, this method will wait until 286 it does, or until the channel is closed. If no exit status is 287 provided by the server, -1 is returned. 288 289 @return: the exit code of the process on the server. 290 @rtype: int 291 292 @since: 1.2 293 """ 294 while True: 295 if self.closed or self.status_event.isSet(): 296 break 297 self.status_event.wait(0.1) 298 return self.exit_status
299
300 - def send_exit_status(self, status):
301 """ 302 Send the exit status of an executed command to the client. (This 303 really only makes sense in server mode.) Many clients expect to 304 get some sort of status code back from an executed command after 305 it completes. 306 307 @param status: the exit code of the process 308 @type status: int 309 310 @since: 1.2 311 """ 312 # in many cases, the channel will not still be open here. 313 # that's fine. 314 m = Message() 315 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 316 m.add_int(self.remote_chanid) 317 m.add_string('exit-status') 318 m.add_boolean(False) 319 m.add_int(status) 320 self.transport._send_user_message(m)
321
322 - def request_x11(self, screen_number=0, auth_protocol=None, auth_cookie=None, 323 single_connection=False, handler=None):
324 """ 325 Request an x11 session on this channel. If the server allows it, 326 further x11 requests can be made from the server to the client, 327 when an x11 application is run in a shell session. 328 329 From RFC4254:: 330 331 It is RECOMMENDED that the 'x11 authentication cookie' that is 332 sent be a fake, random cookie, and that the cookie be checked and 333 replaced by the real cookie when a connection request is received. 334 335 If you omit the auth_cookie, a new secure random 128-bit value will be 336 generated, used, and returned. You will need to use this value to 337 verify incoming x11 requests and replace them with the actual local 338 x11 cookie (which requires some knoweldge of the x11 protocol). 339 340 If a handler is passed in, the handler is called from another thread 341 whenever a new x11 connection arrives. The default handler queues up 342 incoming x11 connections, which may be retrieved using 343 L{Transport.accept}. The handler's calling signature is:: 344 345 handler(channel: Channel, (address: str, port: int)) 346 347 @param screen_number: the x11 screen number (0, 10, etc) 348 @type screen_number: int 349 @param auth_protocol: the name of the X11 authentication method used; 350 if none is given, C{"MIT-MAGIC-COOKIE-1"} is used 351 @type auth_protocol: str 352 @param auth_cookie: hexadecimal string containing the x11 auth cookie; 353 if none is given, a secure random 128-bit value is generated 354 @type auth_cookie: str 355 @param single_connection: if True, only a single x11 connection will be 356 forwarded (by default, any number of x11 connections can arrive 357 over this session) 358 @type single_connection: bool 359 @param handler: an optional handler to use for incoming X11 connections 360 @type handler: function 361 @return: the auth_cookie used 362 """ 363 if self.closed or self.eof_received or self.eof_sent or not self.active: 364 raise SSHException('Channel is not open') 365 if auth_protocol is None: 366 auth_protocol = 'MIT-MAGIC-COOKIE-1' 367 if auth_cookie is None: 368 auth_cookie = binascii.hexlify(self.transport.randpool.get_bytes(16)) 369 370 m = Message() 371 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 372 m.add_int(self.remote_chanid) 373 m.add_string('x11-req') 374 m.add_boolean(True) 375 m.add_boolean(single_connection) 376 m.add_string(auth_protocol) 377 m.add_string(auth_cookie) 378 m.add_int(screen_number) 379 self.event.clear() 380 self.transport._send_user_message(m) 381 self._wait_for_event() 382 self.transport._set_x11_handler(handler) 383 return auth_cookie
384
385 - def get_transport(self):
386 """ 387 Return the L{Transport} associated with this channel. 388 389 @return: the L{Transport} that was used to create this channel. 390 @rtype: L{Transport} 391 """ 392 return self.transport
393
394 - def set_name(self, name):
395 """ 396 Set a name for this channel. Currently it's only used to set the name 397 of the channel in logfile entries. The name can be fetched with the 398 L{get_name} method. 399 400 @param name: new channel name 401 @type name: str 402 """ 403 self._name = name
404
405 - def get_name(self):
406 """ 407 Get the name of this channel that was previously set by L{set_name}. 408 409 @return: the name of this channel. 410 @rtype: str 411 """ 412 return self._name
413
414 - def get_id(self):
415 """ 416 Return the ID # for this channel. The channel ID is unique across 417 a L{Transport} and usually a small number. It's also the number 418 passed to L{ServerInterface.check_channel_request} when determining 419 whether to accept a channel request in server mode. 420 421 @return: the ID of this channel. 422 @rtype: int 423 """ 424 return self.chanid
425
426 - def set_combine_stderr(self, combine):
427 """ 428 Set whether stderr should be combined into stdout on this channel. 429 The default is C{False}, but in some cases it may be convenient to 430 have both streams combined. 431 432 If this is C{False}, and L{exec_command} is called (or C{invoke_shell} 433 with no pty), output to stderr will not show up through the L{recv} 434 and L{recv_ready} calls. You will have to use L{recv_stderr} and 435 L{recv_stderr_ready} to get stderr output. 436 437 If this is C{True}, data will never show up via L{recv_stderr} or 438 L{recv_stderr_ready}. 439 440 @param combine: C{True} if stderr output should be combined into 441 stdout on this channel. 442 @type combine: bool 443 @return: previous setting. 444 @rtype: bool 445 446 @since: 1.1 447 """ 448 data = '' 449 self.lock.acquire() 450 try: 451 old = self.combine_stderr 452 self.combine_stderr = combine 453 if combine and not old: 454 # copy old stderr buffer into primary buffer 455 data = self.in_stderr_buffer.empty() 456 finally: 457 self.lock.release() 458 if len(data) > 0: 459 self._feed(data) 460 return old
461 462 463 ### socket API 464 465
466 - def settimeout(self, timeout):
467 """ 468 Set a timeout on blocking read/write operations. The C{timeout} 469 argument can be a nonnegative float expressing seconds, or C{None}. If 470 a float is given, subsequent channel read/write operations will raise 471 a timeout exception if the timeout period value has elapsed before the 472 operation has completed. Setting a timeout of C{None} disables 473 timeouts on socket operations. 474 475 C{chan.settimeout(0.0)} is equivalent to C{chan.setblocking(0)}; 476 C{chan.settimeout(None)} is equivalent to C{chan.setblocking(1)}. 477 478 @param timeout: seconds to wait for a pending read/write operation 479 before raising C{socket.timeout}, or C{None} for no timeout. 480 @type timeout: float 481 """ 482 self.timeout = timeout
483
484 - def gettimeout(self):
485 """ 486 Returns the timeout in seconds (as a float) associated with socket 487 operations, or C{None} if no timeout is set. This reflects the last 488 call to L{setblocking} or L{settimeout}. 489 490 @return: timeout in seconds, or C{None}. 491 @rtype: float 492 """ 493 return self.timeout
494
495 - def setblocking(self, blocking):
496 """ 497 Set blocking or non-blocking mode of the channel: if C{blocking} is 0, 498 the channel is set to non-blocking mode; otherwise it's set to blocking 499 mode. Initially all channels are in blocking mode. 500 501 In non-blocking mode, if a L{recv} call doesn't find any data, or if a 502 L{send} call can't immediately dispose of the data, an error exception 503 is raised. In blocking mode, the calls block until they can proceed. An 504 EOF condition is considered "immediate data" for L{recv}, so if the 505 channel is closed in the read direction, it will never block. 506 507 C{chan.setblocking(0)} is equivalent to C{chan.settimeout(0)}; 508 C{chan.setblocking(1)} is equivalent to C{chan.settimeout(None)}. 509 510 @param blocking: 0 to set non-blocking mode; non-0 to set blocking 511 mode. 512 @type blocking: int 513 """ 514 if blocking: 515 self.settimeout(None) 516 else: 517 self.settimeout(0.0)
518
519 - def getpeername(self):
520 """ 521 Return the address of the remote side of this Channel, if possible. 522 This is just a wrapper around C{'getpeername'} on the Transport, used 523 to provide enough of a socket-like interface to allow asyncore to work. 524 (asyncore likes to call C{'getpeername'}.) 525 526 @return: the address if the remote host, if known 527 @rtype: tuple(str, int) 528 """ 529 return self.transport.getpeername()
530
531 - def close(self):
532 """ 533 Close the channel. All future read/write operations on the channel 534 will fail. The remote end will receive no more data (after queued data 535 is flushed). Channels are automatically closed when their L{Transport} 536 is closed or when they are garbage collected. 537 """ 538 self.lock.acquire() 539 try: 540 # only close the pipe when the user explicitly closes the channel. 541 # otherwise they will get unpleasant surprises. (and do it before 542 # checking self.closed, since the remote host may have already 543 # closed the connection.) 544 if self._pipe is not None: 545 self._pipe.close() 546 self._pipe = None 547 548 if not self.active or self.closed: 549 return 550 msgs = self._close_internal() 551 finally: 552 self.lock.release() 553 for m in msgs: 554 if m is not None: 555 self.transport._send_user_message(m)
556
557 - def recv_ready(self):
558 """ 559 Returns true if data is buffered and ready to be read from this 560 channel. A C{False} result does not mean that the channel has closed; 561 it means you may need to wait before more data arrives. 562 563 @return: C{True} if a L{recv} call on this channel would immediately 564 return at least one byte; C{False} otherwise. 565 @rtype: boolean 566 """ 567 return self.in_buffer.read_ready()
568
569 - def recv(self, nbytes):
570 """ 571 Receive data from the channel. The return value is a string 572 representing the data received. The maximum amount of data to be 573 received at once is specified by C{nbytes}. If a string of length zero 574 is returned, the channel stream has closed. 575 576 @param nbytes: maximum number of bytes to read. 577 @type nbytes: int 578 @return: data. 579 @rtype: str 580 581 @raise socket.timeout: if no data is ready before the timeout set by 582 L{settimeout}. 583 """ 584 try: 585 out = self.in_buffer.read(nbytes, self.timeout) 586 except PipeTimeout, e: 587 raise socket.timeout() 588 589 ack = self._check_add_window(len(out)) 590 # no need to hold the channel lock when sending this 591 if ack > 0: 592 m = Message() 593 m.add_byte(chr(MSG_CHANNEL_WINDOW_ADJUST)) 594 m.add_int(self.remote_chanid) 595 m.add_int(ack) 596 self.transport._send_user_message(m) 597 598 return out
599
600 - def recv_stderr_ready(self):
601 """ 602 Returns true if data is buffered and ready to be read from this 603 channel's stderr stream. Only channels using L{exec_command} or 604 L{invoke_shell} without a pty will ever have data on the stderr 605 stream. 606 607 @return: C{True} if a L{recv_stderr} call on this channel would 608 immediately return at least one byte; C{False} otherwise. 609 @rtype: boolean 610 611 @since: 1.1 612 """ 613 return self.in_stderr_buffer.read_ready()
614
615 - def recv_stderr(self, nbytes):
616 """ 617 Receive data from the channel's stderr stream. Only channels using 618 L{exec_command} or L{invoke_shell} without a pty will ever have data 619 on the stderr stream. The return value is a string representing the 620 data received. The maximum amount of data to be received at once is 621 specified by C{nbytes}. If a string of length zero is returned, the 622 channel stream has closed. 623 624 @param nbytes: maximum number of bytes to read. 625 @type nbytes: int 626 @return: data. 627 @rtype: str 628 629 @raise socket.timeout: if no data is ready before the timeout set by 630 L{settimeout}. 631 632 @since: 1.1 633 """ 634 try: 635 out = self.in_stderr_buffer.read(nbytes, self.timeout) 636 except PipeTimeout, e: 637 raise socket.timeout() 638 639 ack = self._check_add_window(len(out)) 640 # no need to hold the channel lock when sending this 641 if ack > 0: 642 m = Message() 643 m.add_byte(chr(MSG_CHANNEL_WINDOW_ADJUST)) 644 m.add_int(self.remote_chanid) 645 m.add_int(ack) 646 self.transport._send_user_message(m) 647 648 return out
649
650 - def send_ready(self):
651 """ 652 Returns true if data can be written to this channel without blocking. 653 This means the channel is either closed (so any write attempt would 654 return immediately) or there is at least one byte of space in the 655 outbound buffer. If there is at least one byte of space in the 656 outbound buffer, a L{send} call will succeed immediately and return 657 the number of bytes actually written. 658 659 @return: C{True} if a L{send} call on this channel would immediately 660 succeed or fail 661 @rtype: boolean 662 """ 663 self.lock.acquire() 664 try: 665 if self.closed or self.eof_sent: 666 return True 667 return self.out_window_size > 0 668 finally: 669 self.lock.release()
670
671 - def send(self, s):
672 """ 673 Send data to the channel. Returns the number of bytes sent, or 0 if 674 the channel stream is closed. Applications are responsible for 675 checking that all data has been sent: if only some of the data was 676 transmitted, the application needs to attempt delivery of the remaining 677 data. 678 679 @param s: data to send 680 @type s: str 681 @return: number of bytes actually sent 682 @rtype: int 683 684 @raise socket.timeout: if no data could be sent before the timeout set 685 by L{settimeout}. 686 """ 687 size = len(s) 688 self.lock.acquire() 689 try: 690 size = self._wait_for_send_window(size) 691 if size == 0: 692 # eof or similar 693 return 0 694 m = Message() 695 m.add_byte(chr(MSG_CHANNEL_DATA)) 696 m.add_int(self.remote_chanid) 697 m.add_string(s[:size]) 698 finally: 699 self.lock.release() 700 # Note: We release self.lock before calling _send_user_message. 701 # Otherwise, we can deadlock during re-keying. 702 self.transport._send_user_message(m) 703 return size
704
705 - def send_stderr(self, s):
706 """ 707 Send data to the channel on the "stderr" stream. This is normally 708 only used by servers to send output from shell commands -- clients 709 won't use this. Returns the number of bytes sent, or 0 if the channel 710 stream is closed. Applications are responsible for checking that all 711 data has been sent: if only some of the data was transmitted, the 712 application needs to attempt delivery of the remaining data. 713 714 @param s: data to send. 715 @type s: str 716 @return: number of bytes actually sent. 717 @rtype: int 718 719 @raise socket.timeout: if no data could be sent before the timeout set 720 by L{settimeout}. 721 722 @since: 1.1 723 """ 724 size = len(s) 725 self.lock.acquire() 726 try: 727 size = self._wait_for_send_window(size) 728 if size == 0: 729 # eof or similar 730 return 0 731 m = Message() 732 m.add_byte(chr(MSG_CHANNEL_EXTENDED_DATA)) 733 m.add_int(self.remote_chanid) 734 m.add_int(1) 735 m.add_string(s[:size]) 736 finally: 737 self.lock.release() 738 # Note: We release self.lock before calling _send_user_message. 739 # Otherwise, we can deadlock during re-keying. 740 self.transport._send_user_message(m) 741 return size
742
743 - def sendall(self, s):
744 """ 745 Send data to the channel, without allowing partial results. Unlike 746 L{send}, this method continues to send data from the given string until 747 either all data has been sent or an error occurs. Nothing is returned. 748 749 @param s: data to send. 750 @type s: str 751 752 @raise socket.timeout: if sending stalled for longer than the timeout 753 set by L{settimeout}. 754 @raise socket.error: if an error occured before the entire string was 755 sent. 756 757 @note: If the channel is closed while only part of the data hase been 758 sent, there is no way to determine how much data (if any) was sent. 759 This is irritating, but identically follows python's API. 760 """ 761 while s: 762 if self.closed: 763 # this doesn't seem useful, but it is the documented behavior of Socket 764 raise socket.error('Socket is closed') 765 sent = self.send(s) 766 s = s[sent:] 767 return None
768
769 - def sendall_stderr(self, s):
770 """ 771 Send data to the channel's "stderr" stream, without allowing partial 772 results. Unlike L{send_stderr}, this method continues to send data 773 from the given string until all data has been sent or an error occurs. 774 Nothing is returned. 775 776 @param s: data to send to the client as "stderr" output. 777 @type s: str 778 779 @raise socket.timeout: if sending stalled for longer than the timeout 780 set by L{settimeout}. 781 @raise socket.error: if an error occured before the entire string was 782 sent. 783 784 @since: 1.1 785 """ 786 while s: 787 if self.closed: 788 raise socket.error('Socket is closed') 789 sent = self.send_stderr(s) 790 s = s[sent:] 791 return None
792
793 - def makefile(self, *params):
794 """ 795 Return a file-like object associated with this channel. The optional 796 C{mode} and C{bufsize} arguments are interpreted the same way as by 797 the built-in C{file()} function in python. 798 799 @return: object which can be used for python file I/O. 800 @rtype: L{ChannelFile} 801 """ 802 return ChannelFile(*([self] + list(params)))
803
804 - def makefile_stderr(self, *params):
805 """ 806 Return a file-like object associated with this channel's stderr 807 stream. Only channels using L{exec_command} or L{invoke_shell} 808 without a pty will ever have data on the stderr stream. 809 810 The optional C{mode} and C{bufsize} arguments are interpreted the 811 same way as by the built-in C{file()} function in python. For a 812 client, it only makes sense to open this file for reading. For a 813 server, it only makes sense to open this file for writing. 814 815 @return: object which can be used for python file I/O. 816 @rtype: L{ChannelFile} 817 818 @since: 1.1 819 """ 820 return ChannelStderrFile(*([self] + list(params)))
821
822 - def fileno(self):
823 """ 824 Returns an OS-level file descriptor which can be used for polling, but 825 but I{not} for reading or writing. This is primaily to allow python's 826 C{select} module to work. 827 828 The first time C{fileno} is called on a channel, a pipe is created to 829 simulate real OS-level file descriptor (FD) behavior. Because of this, 830 two OS-level FDs are created, which will use up FDs faster than normal. 831 (You won't notice this effect unless you have hundreds of channels 832 open at the same time.) 833 834 @return: an OS-level file descriptor 835 @rtype: int 836 837 @warning: This method causes channel reads to be slightly less 838 efficient. 839 """ 840 self.lock.acquire() 841 try: 842 if self._pipe is not None: 843 return self._pipe.fileno() 844 # create the pipe and feed in any existing data 845 self._pipe = pipe.make_pipe() 846 p1, p2 = pipe.make_or_pipe(self._pipe) 847 self.in_buffer.set_event(p1) 848 self.in_stderr_buffer.set_event(p2) 849 return self._pipe.fileno() 850 finally: 851 self.lock.release()
852
853 - def shutdown(self, how):
854 """ 855 Shut down one or both halves of the connection. If C{how} is 0, 856 further receives are disallowed. If C{how} is 1, further sends 857 are disallowed. If C{how} is 2, further sends and receives are 858 disallowed. This closes the stream in one or both directions. 859 860 @param how: 0 (stop receiving), 1 (stop sending), or 2 (stop 861 receiving and sending). 862 @type how: int 863 """ 864 if (how == 0) or (how == 2): 865 # feign "read" shutdown 866 self.eof_received = 1 867 if (how == 1) or (how == 2): 868 self.lock.acquire() 869 try: 870 m = self._send_eof() 871 finally: 872 self.lock.release() 873 if m is not None: 874 self.transport._send_user_message(m)
875
876 - def shutdown_read(self):
877 """ 878 Shutdown the receiving side of this socket, closing the stream in 879 the incoming direction. After this call, future reads on this 880 channel will fail instantly. This is a convenience method, equivalent 881 to C{shutdown(0)}, for people who don't make it a habit to 882 memorize unix constants from the 1970s. 883 884 @since: 1.2 885 """ 886 self.shutdown(0)
887
888 - def shutdown_write(self):
889 """ 890 Shutdown the sending side of this socket, closing the stream in 891 the outgoing direction. After this call, future writes on this 892 channel will fail instantly. This is a convenience method, equivalent 893 to C{shutdown(1)}, for people who don't make it a habit to 894 memorize unix constants from the 1970s. 895 896 @since: 1.2 897 """ 898 self.shutdown(1)
899 900 901 ### calls from Transport 902 903
904 - def _set_transport(self, transport):
907
908 - def _set_window(self, window_size, max_packet_size):
909 self.in_window_size = window_size 910 self.in_max_packet_size = max_packet_size 911 # threshold of bytes we receive before we bother to send a window update 912 self.in_window_threshold = window_size // 10 913 self.in_window_sofar = 0 914 self._log(DEBUG, 'Max packet in: %d bytes' % max_packet_size)
915
916 - def _set_remote_channel(self, chanid, window_size, max_packet_size):
917 self.remote_chanid = chanid 918 self.out_window_size = window_size 919 self.out_max_packet_size = max(max_packet_size, MIN_PACKET_SIZE) 920 self.active = 1 921 self._log(DEBUG, 'Max packet out: %d bytes' % max_packet_size)
922
923 - def _request_success(self, m):
924 self._log(DEBUG, 'Sesch channel %d request ok' % self.chanid) 925 self.event.set() 926 return
927
928 - def _request_failed(self, m):
929 self.lock.acquire() 930 try: 931 msgs = self._close_internal() 932 finally: 933 self.lock.release() 934 for m in msgs: 935 if m is not None: 936 self.transport._send_user_message(m)
937
938 - def _feed(self, m):
939 if type(m) is str: 940 # passed from _feed_extended 941 s = m 942 else: 943 s = m.get_string() 944 self.in_buffer.feed(s)
945
946 - def _feed_extended(self, m):
947 code = m.get_int() 948 s = m.get_string() 949 if code != 1: 950 self._log(ERROR, 'unknown extended_data type %d; discarding' % code) 951 return 952 if self.combine_stderr: 953 self._feed(s) 954 else: 955 self.in_stderr_buffer.feed(s)
956
957 - def _window_adjust(self, m):
958 nbytes = m.get_int() 959 self.lock.acquire() 960 try: 961 if self.ultra_debug: 962 self._log(DEBUG, 'window up %d' % nbytes) 963 self.out_window_size += nbytes 964 self.out_buffer_cv.notifyAll() 965 finally: 966 self.lock.release()
967
968 - def _handle_request(self, m):
969 key = m.get_string() 970 want_reply = m.get_boolean() 971 server = self.transport.server_object 972 ok = False 973 if key == 'exit-status': 974 self.exit_status = m.get_int() 975 self.status_event.set() 976 ok = True 977 elif key == 'xon-xoff': 978 # ignore 979 ok = True 980 elif key == 'pty-req': 981 term = m.get_string() 982 width = m.get_int() 983 height = m.get_int() 984 pixelwidth = m.get_int() 985 pixelheight = m.get_int() 986 modes = m.get_string() 987 if server is None: 988 ok = False 989 else: 990 ok = server.check_channel_pty_request(self, term, width, height, pixelwidth, 991 pixelheight, modes) 992 elif key == 'shell': 993 if server is None: 994 ok = False 995 else: 996 ok = server.check_channel_shell_request(self) 997 elif key == 'exec': 998 cmd = m.get_string() 999 if server is None: 1000 ok = False 1001 else: 1002 ok = server.check_channel_exec_request(self, cmd) 1003 elif key == 'subsystem': 1004 name = m.get_string() 1005 if server is None: 1006 ok = False 1007 else: 1008 ok = server.check_channel_subsystem_request(self, name) 1009 elif key == 'window-change': 1010 width = m.get_int() 1011 height = m.get_int() 1012 pixelwidth = m.get_int() 1013 pixelheight = m.get_int() 1014 if server is None: 1015 ok = False 1016 else: 1017 ok = server.check_channel_window_change_request(self, width, height, pixelwidth, 1018 pixelheight) 1019 elif key == 'x11-req': 1020 single_connection = m.get_boolean() 1021 auth_proto = m.get_string() 1022 auth_cookie = m.get_string() 1023 screen_number = m.get_int() 1024 if server is None: 1025 ok = False 1026 else: 1027 ok = server.check_channel_x11_request(self, single_connection, 1028 auth_proto, auth_cookie, screen_number) 1029 else: 1030 self._log(DEBUG, 'Unhandled channel request "%s"' % key) 1031 ok = False 1032 if want_reply: 1033 m = Message() 1034 if ok: 1035 m.add_byte(chr(MSG_CHANNEL_SUCCESS)) 1036 else: 1037 m.add_byte(chr(MSG_CHANNEL_FAILURE)) 1038 m.add_int(self.remote_chanid) 1039 self.transport._send_user_message(m)
1040
1041 - def _handle_eof(self, m):
1042 self.lock.acquire() 1043 try: 1044 if not self.eof_received: 1045 self.eof_received = True 1046 self.in_buffer.close() 1047 self.in_stderr_buffer.close() 1048 if self._pipe is not None: 1049 self._pipe.set_forever() 1050 finally: 1051 self.lock.release() 1052 self._log(DEBUG, 'EOF received (%s)', self._name)
1053
1054 - def _handle_close(self, m):
1055 self.lock.acquire() 1056 try: 1057 msgs = self._close_internal() 1058 self.transport._unlink_channel(self.chanid) 1059 finally: 1060 self.lock.release() 1061 for m in msgs: 1062 if m is not None: 1063 self.transport._send_user_message(m)
1064 1065 1066 ### internals... 1067 1068
1069 - def _log(self, level, msg, *args):
1070 self.logger.log(level, "[chan " + self._name + "] " + msg, *args)
1071
1072 - def _wait_for_event(self):
1073 while True: 1074 self.event.wait(0.1) 1075 if self.event.isSet(): 1076 return 1077 if self.closed: 1078 e = self.transport.get_exception() 1079 if e is None: 1080 e = SSHException('Channel closed.') 1081 raise e 1082 return
1083
1084 - def _set_closed(self):
1085 # you are holding the lock. 1086 self.closed = True 1087 self.in_buffer.close() 1088 self.in_stderr_buffer.close() 1089 self.out_buffer_cv.notifyAll() 1090 if self._pipe is not None: 1091 self._pipe.set_forever()
1092
1093 - def _send_eof(self):
1094 # you are holding the lock. 1095 if self.eof_sent: 1096 return None 1097 m = Message() 1098 m.add_byte(chr(MSG_CHANNEL_EOF)) 1099 m.add_int(self.remote_chanid) 1100 self.eof_sent = True 1101 self._log(DEBUG, 'EOF sent (%s)', self._name) 1102 return m
1103
1104 - def _close_internal(self):
1105 # you are holding the lock. 1106 if not self.active or self.closed: 1107 return None, None 1108 m1 = self._send_eof() 1109 m2 = Message() 1110 m2.add_byte(chr(MSG_CHANNEL_CLOSE)) 1111 m2.add_int(self.remote_chanid) 1112 self._set_closed() 1113 # can't unlink from the Transport yet -- the remote side may still 1114 # try to send meta-data (exit-status, etc) 1115 return m1, m2
1116 1127
1128 - def _check_add_window(self, n):
1129 self.lock.acquire() 1130 try: 1131 if self.closed or self.eof_received or not self.active: 1132 return 0 1133 if self.ultra_debug: 1134 self._log(DEBUG, 'addwindow %d' % n) 1135 self.in_window_sofar += n 1136 if self.in_window_sofar <= self.in_window_threshold: 1137 return 0 1138 if self.ultra_debug: 1139 self._log(DEBUG, 'addwindow send %d' % self.in_window_sofar) 1140 out = self.in_window_sofar 1141 self.in_window_sofar = 0 1142 return out 1143 finally: 1144 self.lock.release()
1145
1146 - def _wait_for_send_window(self, size):
1147 """ 1148 (You are already holding the lock.) 1149 Wait for the send window to open up, and allocate up to C{size} bytes 1150 for transmission. If no space opens up before the timeout, a timeout 1151 exception is raised. Returns the number of bytes available to send 1152 (may be less than requested). 1153 """ 1154 # you are already holding the lock 1155 if self.closed or self.eof_sent: 1156 return 0 1157 if self.out_window_size == 0: 1158 # should we block? 1159 if self.timeout == 0.0: 1160 raise socket.timeout() 1161 # loop here in case we get woken up but a different thread has filled the buffer 1162 timeout = self.timeout 1163 while self.out_window_size == 0: 1164 if self.closed or self.eof_sent: 1165 return 0 1166 then = time.time() 1167 self.out_buffer_cv.wait(timeout) 1168 if timeout != None: 1169 timeout -= time.time() - then 1170 if timeout <= 0.0: 1171 raise socket.timeout() 1172 # we have some window to squeeze into 1173 if self.closed or self.eof_sent: 1174 return 0 1175 if self.out_window_size < size: 1176 size = self.out_window_size 1177 if self.out_max_packet_size - 64 < size: 1178 size = self.out_max_packet_size - 64 1179 self.out_window_size -= size 1180 if self.ultra_debug: 1181 self._log(DEBUG, 'window down to %d' % self.out_window_size) 1182 return size
1183 1184
1185 -class ChannelFile (BufferedFile):
1186 """ 1187 A file-like wrapper around L{Channel}. A ChannelFile is created by calling 1188 L{Channel.makefile}. 1189 1190 @bug: To correctly emulate the file object created from a socket's 1191 C{makefile} method, a L{Channel} and its C{ChannelFile} should be able 1192 to be closed or garbage-collected independently. Currently, closing 1193 the C{ChannelFile} does nothing but flush the buffer. 1194 """ 1195
1196 - def __init__(self, channel, mode = 'r', bufsize = -1):
1197 self.channel = channel 1198 BufferedFile.__init__(self) 1199 self._set_mode(mode, bufsize)
1200
1201 - def __repr__(self):
1202 """ 1203 Returns a string representation of this object, for debugging. 1204 1205 @rtype: str 1206 """ 1207 return '<paramiko.ChannelFile from ' + repr(self.channel) + '>'
1208
1209 - def _read(self, size):
1210 return self.channel.recv(size)
1211
1212 - def _write(self, data):
1213 self.channel.sendall(data) 1214 return len(data)
1215 1216
1217 -class ChannelStderrFile (ChannelFile):
1218 - def __init__(self, channel, mode = 'r', bufsize = -1):
1219 ChannelFile.__init__(self, channel, mode, bufsize)
1220
1221 - def _read(self, size):
1222 return self.channel.recv_stderr(size)
1223
1224 - def _write(self, data):
1225 self.channel.sendall_stderr(data) 1226 return len(data)
1227 1228 1229 # vim: set shiftwidth=4 expandtab : 1230