blob: afd580bff2b359b435a3d35e90aef445ea136415 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# A higher level module for using sockets (or Windows named pipes)
3#
4# multiprocessing/connection.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
34
35__all__ = [ 'Client', 'Listener', 'Pipe' ]
36
Antoine Pitrou87cf2202011-05-09 17:04:27 +020037import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000038import os
39import sys
Antoine Pitrou87cf2202011-05-09 17:04:27 +020040import pickle
41import select
Benjamin Petersone711caf2008-06-11 16:44:04 +000042import socket
Antoine Pitrou87cf2202011-05-09 17:04:27 +020043import struct
Georg Brandl6aa2d1f2008-08-12 08:35:52 +000044import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000045import time
46import tempfile
47import itertools
48
49import _multiprocessing
Antoine Pitrou87cf2202011-05-09 17:04:27 +020050from multiprocessing import current_process, AuthenticationError, BufferTooShort
Benjamin Petersone711caf2008-06-11 16:44:04 +000051from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
Antoine Pitrou87cf2202011-05-09 17:04:27 +020052try:
53 from _multiprocessing import win32
54except ImportError:
55 if sys.platform == 'win32':
56 raise
57 win32 = None
Benjamin Petersone711caf2008-06-11 16:44:04 +000058
59#
60#
61#
62
63BUFSIZE = 8192
Antoine Pitrou45d61a32009-11-13 22:35:18 +000064# A very generous timeout when it comes to local connections...
65CONNECTION_TIMEOUT = 20.
Benjamin Petersone711caf2008-06-11 16:44:04 +000066
67_mmap_counter = itertools.count()
68
69default_family = 'AF_INET'
70families = ['AF_INET']
71
72if hasattr(socket, 'AF_UNIX'):
73 default_family = 'AF_UNIX'
74 families += ['AF_UNIX']
75
76if sys.platform == 'win32':
77 default_family = 'AF_PIPE'
78 families += ['AF_PIPE']
79
Antoine Pitrou45d61a32009-11-13 22:35:18 +000080
81def _init_timeout(timeout=CONNECTION_TIMEOUT):
82 return time.time() + timeout
83
84def _check_timeout(t):
85 return time.time() > t
86
Benjamin Petersone711caf2008-06-11 16:44:04 +000087#
88#
89#
90
91def arbitrary_address(family):
92 '''
93 Return an arbitrary free address for the given family
94 '''
95 if family == 'AF_INET':
96 return ('localhost', 0)
97 elif family == 'AF_UNIX':
98 return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
99 elif family == 'AF_PIPE':
100 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
101 (os.getpid(), next(_mmap_counter)))
102 else:
103 raise ValueError('unrecognized family')
104
105
106def address_type(address):
107 '''
108 Return the types of the address
109
110 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
111 '''
112 if type(address) == tuple:
113 return 'AF_INET'
114 elif type(address) is str and address.startswith('\\\\'):
115 return 'AF_PIPE'
116 elif type(address) is str:
117 return 'AF_UNIX'
118 else:
119 raise ValueError('address type of %r unrecognized' % address)
120
121#
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200122# Connection classes
123#
124
125class _ConnectionBase:
126 _handle = None
127
128 def __init__(self, handle, readable=True, writable=True):
129 handle = handle.__index__()
130 if handle < 0:
131 raise ValueError("invalid handle")
132 if not readable and not writable:
133 raise ValueError(
134 "at least one of `readable` and `writable` must be True")
135 self._handle = handle
136 self._readable = readable
137 self._writable = writable
138
139 def __del__(self):
140 if self._handle is not None:
141 self._close()
142
143 def _check_closed(self):
144 if self._handle is None:
145 raise IOError("handle is closed")
146
147 def _check_readable(self):
148 if not self._readable:
149 raise IOError("connection is write-only")
150
151 def _check_writable(self):
152 if not self._writable:
153 raise IOError("connection is read-only")
154
155 def _bad_message_length(self):
156 if self._writable:
157 self._readable = False
158 else:
159 self.close()
160 raise IOError("bad message length")
161
162 @property
163 def closed(self):
164 """True if the connection is closed"""
165 return self._handle is None
166
167 @property
168 def readable(self):
169 """True if the connection is readable"""
170 return self._readable
171
172 @property
173 def writable(self):
174 """True if the connection is writable"""
175 return self._writable
176
177 def fileno(self):
178 """File descriptor or handle of the connection"""
179 self._check_closed()
180 return self._handle
181
182 def close(self):
183 """Close the connection"""
184 if self._handle is not None:
185 try:
186 self._close()
187 finally:
188 self._handle = None
189
190 def send_bytes(self, buf, offset=0, size=None):
191 """Send the bytes data from a bytes-like object"""
192 self._check_closed()
193 self._check_writable()
194 m = memoryview(buf)
195 # HACK for byte-indexing of non-bytewise buffers (e.g. array.array)
196 if m.itemsize > 1:
197 m = memoryview(bytes(m))
198 n = len(m)
199 if offset < 0:
200 raise ValueError("offset is negative")
201 if n < offset:
202 raise ValueError("buffer length < offset")
203 if size is None:
204 size = n - offset
205 elif size < 0:
206 raise ValueError("size is negative")
207 elif offset + size > n:
208 raise ValueError("buffer length < offset + size")
209 self._send_bytes(m[offset:offset + size])
210
211 def send(self, obj):
212 """Send a (picklable) object"""
213 self._check_closed()
214 self._check_writable()
215 buf = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
216 self._send_bytes(memoryview(buf))
217
218 def recv_bytes(self, maxlength=None):
219 """
220 Receive bytes data as a bytes object.
221 """
222 self._check_closed()
223 self._check_readable()
224 if maxlength is not None and maxlength < 0:
225 raise ValueError("negative maxlength")
226 buf = self._recv_bytes(maxlength)
227 if buf is None:
228 self._bad_message_length()
229 return buf.getvalue()
230
231 def recv_bytes_into(self, buf, offset=0):
232 """
233 Receive bytes data into a writeable buffer-like object.
234 Return the number of bytes read.
235 """
236 self._check_closed()
237 self._check_readable()
238 with memoryview(buf) as m:
239 # Get bytesize of arbitrary buffer
240 itemsize = m.itemsize
241 bytesize = itemsize * len(m)
242 if offset < 0:
243 raise ValueError("negative offset")
244 elif offset > bytesize:
245 raise ValueError("offset too large")
246 result = self._recv_bytes()
247 size = result.tell()
248 if bytesize < offset + size:
249 raise BufferTooShort(result.getvalue())
250 # Message can fit in dest
251 result.seek(0)
252 result.readinto(m[offset // itemsize :
253 (offset + size) // itemsize])
254 return size
255
256 def recv(self):
257 """Receive a (picklable) object"""
258 self._check_closed()
259 self._check_readable()
260 buf = self._recv_bytes()
261 return pickle.loads(buf.getbuffer())
262
263 def poll(self, timeout=0.0):
264 """Whether there is any input available to be read"""
265 self._check_closed()
266 self._check_readable()
267 if timeout < 0.0:
268 timeout = None
269 return self._poll(timeout)
270
271
272if win32:
273
274 class PipeConnection(_ConnectionBase):
275 """
276 Connection class based on a Windows named pipe.
277 """
278
279 def _close(self):
280 win32.CloseHandle(self._handle)
281
282 def _send_bytes(self, buf):
283 nwritten = win32.WriteFile(self._handle, buf)
284 assert nwritten == len(buf)
285
286 def _recv_bytes(self, maxsize=None):
287 buf = io.BytesIO()
288 bufsize = 512
289 if maxsize is not None:
290 bufsize = min(bufsize, maxsize)
291 try:
292 firstchunk, complete = win32.ReadFile(self._handle, bufsize)
293 except IOError as e:
294 if e.errno == win32.ERROR_BROKEN_PIPE:
295 raise EOFError
296 raise
297 lenfirstchunk = len(firstchunk)
298 buf.write(firstchunk)
299 if complete:
300 return buf
301 navail, nleft = win32.PeekNamedPipe(self._handle)
302 if maxsize is not None and lenfirstchunk + nleft > maxsize:
303 return None
304 lastchunk, complete = win32.ReadFile(self._handle, nleft)
305 assert complete
306 buf.write(lastchunk)
307 return buf
308
309 def _poll(self, timeout):
310 navail, nleft = win32.PeekNamedPipe(self._handle)
311 if navail > 0:
312 return True
313 elif timeout == 0.0:
314 return False
315 # Setup a polling loop (translated straight from old
316 # pipe_connection.c)
317 if timeout < 0.0:
318 deadline = None
319 else:
320 deadline = time.time() + timeout
321 delay = 0.001
322 max_delay = 0.02
323 while True:
324 time.sleep(delay)
325 navail, nleft = win32.PeekNamedPipe(self._handle)
326 if navail > 0:
327 return True
328 if deadline and time.time() > deadline:
329 return False
330 if delay < max_delay:
331 delay += 0.001
332
333
334class Connection(_ConnectionBase):
335 """
336 Connection class based on an arbitrary file descriptor (Unix only), or
337 a socket handle (Windows).
338 """
339
340 if win32:
341 def _close(self):
342 win32.closesocket(self._handle)
343 _write = win32.send
344 _read = win32.recv
345 else:
346 def _close(self):
347 os.close(self._handle)
348 _write = os.write
349 _read = os.read
350
351 def _send(self, buf, write=_write):
352 remaining = len(buf)
353 while True:
354 n = write(self._handle, buf)
355 remaining -= n
356 if remaining == 0:
357 break
358 buf = buf[n:]
359
360 def _recv(self, size, read=_read):
361 buf = io.BytesIO()
362 remaining = size
363 while remaining > 0:
364 chunk = read(self._handle, remaining)
365 n = len(chunk)
366 if n == 0:
367 if remaining == size:
368 raise EOFError
369 else:
370 raise IOError("got end of file during message")
371 buf.write(chunk)
372 remaining -= n
373 return buf
374
375 def _send_bytes(self, buf):
376 # For wire compatibility with 3.2 and lower
377 n = len(buf)
378 self._send(struct.pack("=i", len(buf)))
379 # The condition is necessary to avoid "broken pipe" errors
380 # when sending a 0-length buffer if the other end closed the pipe.
381 if n > 0:
382 self._send(buf)
383
384 def _recv_bytes(self, maxsize=None):
385 buf = self._recv(4)
386 size, = struct.unpack("=i", buf.getvalue())
387 if maxsize is not None and size > maxsize:
388 return None
389 return self._recv(size)
390
391 def _poll(self, timeout):
392 r = select.select([self._handle], [], [], timeout)[0]
393 return bool(r)
394
395
396#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000397# Public functions
398#
399
400class Listener(object):
401 '''
402 Returns a listener object.
403
404 This is a wrapper for a bound socket which is 'listening' for
405 connections, or for a Windows named pipe.
406 '''
407 def __init__(self, address=None, family=None, backlog=1, authkey=None):
408 family = family or (address and address_type(address)) \
409 or default_family
410 address = address or arbitrary_address(family)
411
412 if family == 'AF_PIPE':
413 self._listener = PipeListener(address, backlog)
414 else:
415 self._listener = SocketListener(address, family, backlog)
416
417 if authkey is not None and not isinstance(authkey, bytes):
418 raise TypeError('authkey should be a byte string')
419
420 self._authkey = authkey
421
422 def accept(self):
423 '''
424 Accept a connection on the bound socket or named pipe of `self`.
425
426 Returns a `Connection` object.
427 '''
428 c = self._listener.accept()
429 if self._authkey:
430 deliver_challenge(c, self._authkey)
431 answer_challenge(c, self._authkey)
432 return c
433
434 def close(self):
435 '''
436 Close the bound socket or named pipe of `self`.
437 '''
438 return self._listener.close()
439
440 address = property(lambda self: self._listener._address)
441 last_accepted = property(lambda self: self._listener._last_accepted)
442
443
444def Client(address, family=None, authkey=None):
445 '''
446 Returns a connection to the address of a `Listener`
447 '''
448 family = family or address_type(address)
449 if family == 'AF_PIPE':
450 c = PipeClient(address)
451 else:
452 c = SocketClient(address)
453
454 if authkey is not None and not isinstance(authkey, bytes):
455 raise TypeError('authkey should be a byte string')
456
457 if authkey is not None:
458 answer_challenge(c, authkey)
459 deliver_challenge(c, authkey)
460
461 return c
462
463
464if sys.platform != 'win32':
465
466 def Pipe(duplex=True):
467 '''
468 Returns pair of connection objects at either end of a pipe
469 '''
470 if duplex:
471 s1, s2 = socket.socketpair()
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200472 c1 = Connection(os.dup(s1.fileno()))
473 c2 = Connection(os.dup(s2.fileno()))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000474 s1.close()
475 s2.close()
476 else:
477 fd1, fd2 = os.pipe()
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200478 c1 = Connection(fd1, writable=False)
479 c2 = Connection(fd2, readable=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000480
481 return c1, c2
482
483else:
484
Benjamin Petersone711caf2008-06-11 16:44:04 +0000485 def Pipe(duplex=True):
486 '''
487 Returns pair of connection objects at either end of a pipe
488 '''
489 address = arbitrary_address('AF_PIPE')
490 if duplex:
491 openmode = win32.PIPE_ACCESS_DUPLEX
492 access = win32.GENERIC_READ | win32.GENERIC_WRITE
493 obsize, ibsize = BUFSIZE, BUFSIZE
494 else:
495 openmode = win32.PIPE_ACCESS_INBOUND
496 access = win32.GENERIC_WRITE
497 obsize, ibsize = 0, BUFSIZE
498
499 h1 = win32.CreateNamedPipe(
500 address, openmode,
501 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
502 win32.PIPE_WAIT,
503 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
504 )
505 h2 = win32.CreateFile(
506 address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
507 )
508 win32.SetNamedPipeHandleState(
509 h2, win32.PIPE_READMODE_MESSAGE, None, None
510 )
511
512 try:
513 win32.ConnectNamedPipe(h1, win32.NULL)
514 except WindowsError as e:
515 if e.args[0] != win32.ERROR_PIPE_CONNECTED:
516 raise
517
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200518 c1 = PipeConnection(h1, writable=duplex)
519 c2 = PipeConnection(h2, readable=duplex)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000520
521 return c1, c2
522
523#
524# Definitions for connections based on sockets
525#
526
527class SocketListener(object):
528 '''
Georg Brandl734e2682008-08-12 08:18:18 +0000529 Representation of a socket which is bound to an address and listening
Benjamin Petersone711caf2008-06-11 16:44:04 +0000530 '''
531 def __init__(self, address, family, backlog=1):
532 self._socket = socket.socket(getattr(socket, family))
Jesse Nollerc5d28a02009-03-30 16:37:36 +0000533 self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000534 self._socket.bind(address)
535 self._socket.listen(backlog)
Georg Brandl6aa2d1f2008-08-12 08:35:52 +0000536 self._address = self._socket.getsockname()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000537 self._family = family
538 self._last_accepted = None
539
Benjamin Petersone711caf2008-06-11 16:44:04 +0000540 if family == 'AF_UNIX':
541 self._unlink = Finalize(
Georg Brandl2ee470f2008-07-16 12:55:28 +0000542 self, os.unlink, args=(address,), exitpriority=0
Benjamin Petersone711caf2008-06-11 16:44:04 +0000543 )
544 else:
545 self._unlink = None
546
547 def accept(self):
548 s, self._last_accepted = self._socket.accept()
549 fd = duplicate(s.fileno())
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200550 conn = Connection(fd)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000551 s.close()
552 return conn
553
554 def close(self):
555 self._socket.close()
556 if self._unlink is not None:
557 self._unlink()
558
559
560def SocketClient(address):
561 '''
562 Return a connection object connected to the socket given by `address`
563 '''
564 family = address_type(address)
Victor Stinner2b695062011-01-03 15:47:59 +0000565 with socket.socket( getattr(socket, family) ) as s:
566 t = _init_timeout()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000567
Victor Stinner2b695062011-01-03 15:47:59 +0000568 while 1:
569 try:
570 s.connect(address)
571 except socket.error as e:
572 if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
573 debug('failed to connect to address %s', address)
574 raise
575 time.sleep(0.01)
576 else:
577 break
Benjamin Petersone711caf2008-06-11 16:44:04 +0000578 else:
Victor Stinner2b695062011-01-03 15:47:59 +0000579 raise
Benjamin Petersone711caf2008-06-11 16:44:04 +0000580
Victor Stinner2b695062011-01-03 15:47:59 +0000581 fd = duplicate(s.fileno())
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200582 conn = Connection(fd)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000583 return conn
584
585#
586# Definitions for connections based on named pipes
587#
588
589if sys.platform == 'win32':
590
591 class PipeListener(object):
592 '''
593 Representation of a named pipe
594 '''
595 def __init__(self, address, backlog=None):
596 self._address = address
597 handle = win32.CreateNamedPipe(
598 address, win32.PIPE_ACCESS_DUPLEX,
599 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
600 win32.PIPE_WAIT,
601 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
602 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
603 )
604 self._handle_queue = [handle]
605 self._last_accepted = None
606
607 sub_debug('listener created with address=%r', self._address)
608
609 self.close = Finalize(
610 self, PipeListener._finalize_pipe_listener,
611 args=(self._handle_queue, self._address), exitpriority=0
612 )
613
614 def accept(self):
615 newhandle = win32.CreateNamedPipe(
616 self._address, win32.PIPE_ACCESS_DUPLEX,
617 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
618 win32.PIPE_WAIT,
619 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
620 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
621 )
622 self._handle_queue.append(newhandle)
623 handle = self._handle_queue.pop(0)
624 try:
625 win32.ConnectNamedPipe(handle, win32.NULL)
626 except WindowsError as e:
627 if e.args[0] != win32.ERROR_PIPE_CONNECTED:
628 raise
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200629 return PipeConnection(handle)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000630
631 @staticmethod
632 def _finalize_pipe_listener(queue, address):
633 sub_debug('closing listener with address=%r', address)
634 for handle in queue:
635 close(handle)
636
637 def PipeClient(address):
638 '''
639 Return a connection object connected to the pipe given by `address`
640 '''
Antoine Pitrou45d61a32009-11-13 22:35:18 +0000641 t = _init_timeout()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000642 while 1:
643 try:
644 win32.WaitNamedPipe(address, 1000)
645 h = win32.CreateFile(
646 address, win32.GENERIC_READ | win32.GENERIC_WRITE,
647 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
648 )
649 except WindowsError as e:
650 if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
Antoine Pitrou45d61a32009-11-13 22:35:18 +0000651 win32.ERROR_PIPE_BUSY) or _check_timeout(t):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000652 raise
653 else:
654 break
655 else:
656 raise
657
658 win32.SetNamedPipeHandleState(
659 h, win32.PIPE_READMODE_MESSAGE, None, None
660 )
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200661 return PipeConnection(h)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000662
663#
664# Authentication stuff
665#
666
667MESSAGE_LENGTH = 20
668
Benjamin Peterson1fcfe212008-06-25 12:54:22 +0000669CHALLENGE = b'#CHALLENGE#'
670WELCOME = b'#WELCOME#'
671FAILURE = b'#FAILURE#'
Benjamin Petersone711caf2008-06-11 16:44:04 +0000672
673def deliver_challenge(connection, authkey):
674 import hmac
675 assert isinstance(authkey, bytes)
676 message = os.urandom(MESSAGE_LENGTH)
677 connection.send_bytes(CHALLENGE + message)
678 digest = hmac.new(authkey, message).digest()
679 response = connection.recv_bytes(256) # reject large message
680 if response == digest:
681 connection.send_bytes(WELCOME)
682 else:
683 connection.send_bytes(FAILURE)
684 raise AuthenticationError('digest received was wrong')
685
686def answer_challenge(connection, authkey):
687 import hmac
688 assert isinstance(authkey, bytes)
689 message = connection.recv_bytes(256) # reject large message
690 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
691 message = message[len(CHALLENGE):]
692 digest = hmac.new(authkey, message).digest()
693 connection.send_bytes(digest)
694 response = connection.recv_bytes(256) # reject large message
695 if response != WELCOME:
696 raise AuthenticationError('digest sent was rejected')
697
698#
699# Support for using xmlrpclib for serialization
700#
701
702class ConnectionWrapper(object):
703 def __init__(self, conn, dumps, loads):
704 self._conn = conn
705 self._dumps = dumps
706 self._loads = loads
707 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
708 obj = getattr(conn, attr)
709 setattr(self, attr, obj)
710 def send(self, obj):
711 s = self._dumps(obj)
712 self._conn.send_bytes(s)
713 def recv(self):
714 s = self._conn.recv_bytes()
715 return self._loads(s)
716
717def _xml_dumps(obj):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000718 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000719
720def _xml_loads(s):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +0000721 (obj,), method = xmlrpclib.loads(s.decode('utf-8'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000722 return obj
723
724class XmlListener(Listener):
725 def accept(self):
726 global xmlrpclib
727 import xmlrpc.client as xmlrpclib
728 obj = Listener.accept(self)
729 return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
730
731def XmlClient(*args, **kwds):
732 global xmlrpclib
733 import xmlrpc.client as xmlrpclib
734 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
Antoine Pitrou87cf2202011-05-09 17:04:27 +0200735
736
737# Late import because of circular import
738from multiprocessing.forking import duplicate, close