blob: 66076509a1202e7a1b4d8a481f64621a4bfbbf3e [file] [log] [blame]
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01001#
2# We use a background thread for sharing fds on Unix, and for sharing sockets on
3# Windows.
4#
5# A client which wants to pickle a resource registers it with the resource
6# sharer and gets an identifier in return. The unpickling process will connect
7# to the resource sharer, sends the identifier and its pid, and then receives
8# the resource.
9#
10
11import os
12import signal
13import socket
14import sys
15import threading
16
17from . import process
Davin Potts54586472016-09-09 18:03:10 -050018from .context import reduction
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010019from . import util
20
21__all__ = ['stop']
22
23
24if sys.platform == 'win32':
25 __all__ += ['DupSocket']
26
27 class DupSocket(object):
28 '''Picklable wrapper for a socket.'''
29 def __init__(self, sock):
30 new_sock = sock.dup()
31 def send(conn, pid):
32 share = new_sock.share(pid)
33 conn.send_bytes(share)
34 self._id = _resource_sharer.register(send, new_sock.close)
35
36 def detach(self):
37 '''Get the socket. This should only be called once.'''
38 with _resource_sharer.get_connection(self._id) as conn:
39 share = conn.recv_bytes()
40 return socket.fromshare(share)
41
42else:
43 __all__ += ['DupFd']
44
45 class DupFd(object):
46 '''Wrapper for fd which can be used at any time.'''
47 def __init__(self, fd):
48 new_fd = os.dup(fd)
49 def send(conn, pid):
50 reduction.send_handle(conn, new_fd, pid)
51 def close():
52 os.close(new_fd)
53 self._id = _resource_sharer.register(send, close)
54
55 def detach(self):
56 '''Get the fd. This should only be called once.'''
57 with _resource_sharer.get_connection(self._id) as conn:
58 return reduction.recv_handle(conn)
59
60
61class _ResourceSharer(object):
penguindustin96466302019-05-06 14:57:17 -040062 '''Manager for resources using background thread.'''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010063 def __init__(self):
64 self._key = 0
65 self._cache = {}
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010066 self._lock = threading.Lock()
67 self._listener = None
68 self._address = None
69 self._thread = None
70 util.register_after_fork(self, _ResourceSharer._afterfork)
71
72 def register(self, send, close):
73 '''Register resource, returning an identifier.'''
74 with self._lock:
75 if self._address is None:
76 self._start()
77 self._key += 1
78 self._cache[self._key] = (send, close)
79 return (self._address, self._key)
80
81 @staticmethod
82 def get_connection(ident):
83 '''Return connection from which to receive identified resource.'''
84 from .connection import Client
85 address, key = ident
86 c = Client(address, authkey=process.current_process().authkey)
87 c.send((key, os.getpid()))
88 return c
89
90 def stop(self, timeout=None):
91 '''Stop the background thread and clear registered resources.'''
92 from .connection import Client
93 with self._lock:
94 if self._address is not None:
95 c = Client(self._address,
96 authkey=process.current_process().authkey)
97 c.send(None)
98 c.close()
99 self._thread.join(timeout)
100 if self._thread.is_alive():
101 util.sub_warning('_ResourceSharer thread did '
102 'not stop when asked')
103 self._listener.close()
104 self._thread = None
105 self._address = None
106 self._listener = None
107 for key, (send, close) in self._cache.items():
108 close()
109 self._cache.clear()
110
111 def _afterfork(self):
112 for key, (send, close) in self._cache.items():
113 close()
114 self._cache.clear()
Dong-hee Naa5900ec2020-04-15 01:35:36 +0900115 self._lock._at_fork_reinit()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100116 if self._listener is not None:
117 self._listener.close()
118 self._listener = None
119 self._address = None
120 self._thread = None
121
122 def _start(self):
123 from .connection import Listener
Allen W. Smith, Ph.Dbd73e722017-08-29 17:52:18 -0500124 assert self._listener is None, "Already have Listener"
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100125 util.debug('starting listener and thread for sending handles')
126 self._listener = Listener(authkey=process.current_process().authkey)
127 self._address = self._listener.address
128 t = threading.Thread(target=self._serve)
129 t.daemon = True
130 t.start()
131 self._thread = t
132
133 def _serve(self):
134 if hasattr(signal, 'pthread_sigmask'):
Antoine Pitrou9d3627e2018-05-04 13:00:50 +0200135 signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100136 while 1:
137 try:
138 with self._listener.accept() as conn:
139 msg = conn.recv()
140 if msg is None:
141 break
142 key, destination_pid = msg
143 send, close = self._cache.pop(key)
144 try:
145 send(conn, destination_pid)
146 finally:
147 close()
148 except:
149 if not util.is_exiting():
150 sys.excepthook(*sys.exc_info())
151
152
153_resource_sharer = _ResourceSharer()
154stop = _resource_sharer.stop