1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 """
20 Attempt to generalize the "feeder" part of a Channel: an object which can be
21 read from and closed, but is reading from a buffer fed by another thread. The
22 read operations are blocking and can have a timeout set.
23 """
24
25 import array
26 import threading
27 import time
28
29
31 """
32 Indicates that a timeout was reached on a read from a L{BufferedPipe}.
33 """
34 pass
35
36
38 """
39 A buffer that obeys normal read (with timeout) & close semantics for a
40 file or socket, but is fed data from another thread. This is used by
41 L{Channel}.
42 """
43
45 self._lock = threading.Lock()
46 self._cv = threading.Condition(self._lock)
47 self._event = None
48 self._buffer = array.array('B')
49 self._closed = False
50
52 """
53 Set an event on this buffer. When data is ready to be read (or the
54 buffer has been closed), the event will be set. When no data is
55 ready, the event will be cleared.
56
57 @param event: the event to set/clear
58 @type event: Event
59 """
60 self._event = event
61 if len(self._buffer) > 0:
62 event.set()
63 else:
64 event.clear()
65
66 - def feed(self, data):
67 """
68 Feed new data into this pipe. This method is assumed to be called
69 from a separate thread, so synchronization is done.
70
71 @param data: the data to add
72 @type data: str
73 """
74 self._lock.acquire()
75 try:
76 if self._event is not None:
77 self._event.set()
78 self._buffer.fromstring(data)
79 self._cv.notifyAll()
80 finally:
81 self._lock.release()
82
84 """
85 Returns true if data is buffered and ready to be read from this
86 feeder. A C{False} result does not mean that the feeder has closed;
87 it means you may need to wait before more data arrives.
88
89 @return: C{True} if a L{read} call would immediately return at least
90 one byte; C{False} otherwise.
91 @rtype: bool
92 """
93 self._lock.acquire()
94 try:
95 if len(self._buffer) == 0:
96 return False
97 return True
98 finally:
99 self._lock.release()
100
101 - def read(self, nbytes, timeout=None):
102 """
103 Read data from the pipe. The return value is a string representing
104 the data received. The maximum amount of data to be received at once
105 is specified by C{nbytes}. If a string of length zero is returned,
106 the pipe has been closed.
107
108 The optional C{timeout} argument can be a nonnegative float expressing
109 seconds, or C{None} for no timeout. If a float is given, a
110 C{PipeTimeout} will be raised if the timeout period value has
111 elapsed before any data arrives.
112
113 @param nbytes: maximum number of bytes to read
114 @type nbytes: int
115 @param timeout: maximum seconds to wait (or C{None}, the default, to
116 wait forever)
117 @type timeout: float
118 @return: data
119 @rtype: str
120
121 @raise PipeTimeout: if a timeout was specified and no data was ready
122 before that timeout
123 """
124 out = ''
125 self._lock.acquire()
126 try:
127 if len(self._buffer) == 0:
128 if self._closed:
129 return out
130
131 if timeout == 0.0:
132 raise PipeTimeout()
133
134
135 while (len(self._buffer) == 0) and not self._closed:
136 then = time.time()
137 self._cv.wait(timeout)
138 if timeout is not None:
139 timeout -= time.time() - then
140 if timeout <= 0.0:
141 raise PipeTimeout()
142
143
144 if len(self._buffer) <= nbytes:
145 out = self._buffer.tostring()
146 del self._buffer[:]
147 if (self._event is not None) and not self._closed:
148 self._event.clear()
149 else:
150 out = self._buffer[:nbytes].tostring()
151 del self._buffer[:nbytes]
152 finally:
153 self._lock.release()
154
155 return out
156
158 """
159 Clear out the buffer and return all data that was in it.
160
161 @return: any data that was in the buffer prior to clearing it out
162 @rtype: str
163 """
164 self._lock.acquire()
165 try:
166 out = self._buffer.tostring()
167 del self._buffer[:]
168 if (self._event is not None) and not self._closed:
169 self._event.clear()
170 return out
171 finally:
172 self._lock.release()
173
175 """
176 Close this pipe object. Future calls to L{read} after the buffer
177 has been emptied will return immediately with an empty string.
178 """
179 self._lock.acquire()
180 try:
181 self._closed = True
182 self._cv.notifyAll()
183 if self._event is not None:
184 self._event.set()
185 finally:
186 self._lock.release()
187
189 """
190 Return the number of bytes buffered.
191
192 @return: number of bytes bufferes
193 @rtype: int
194 """
195 self._lock.acquire()
196 try:
197 return len(self._buffer)
198 finally:
199 self._lock.release()
200