blob: ef1e862dd7ceb51ae18980a920081e771720e3bc [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module to allow spawning of processes on foreign host
3#
4# Depends on `multiprocessing` package -- tested with `processing-0.60`
5#
Benjamin Peterson4469d0c2008-11-30 22:46:23 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
Benjamin Petersone711caf2008-06-11 16:44:04 +00009
10__all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
11
12#
13# Imports
14#
15
16import sys
17import os
18import tarfile
19import shutil
20import subprocess
21import logging
22import itertools
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +000023import queue
Benjamin Petersone711caf2008-06-11 16:44:04 +000024
25try:
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +000026 import pickle as pickle
Benjamin Petersone711caf2008-06-11 16:44:04 +000027except ImportError:
28 import pickle
29
30from multiprocessing import Process, current_process, cpu_count
31from multiprocessing import util, managers, connection, forking, pool
32
33#
34# Logging
35#
36
37def get_logger():
38 return _logger
39
40_logger = logging.getLogger('distributing')
41_logger.propogate = 0
42
Benjamin Petersone711caf2008-06-11 16:44:04 +000043_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
44_handler = logging.StreamHandler()
45_handler.setFormatter(_formatter)
46_logger.addHandler(_handler)
47
48info = _logger.info
49debug = _logger.debug
50
51#
52# Get number of cpus
53#
54
55try:
56 slot_count = cpu_count()
57except NotImplemented:
58 slot_count = 1
59
60#
61# Manager type which spawns subprocesses
62#
63
64class HostManager(managers.SyncManager):
65 '''
66 Manager type used for spawning processes on a (presumably) foreign host
67 '''
68 def __init__(self, address, authkey):
69 managers.SyncManager.__init__(self, address, authkey)
70 self._name = 'Host-unknown'
71
72 def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
73 if hasattr(sys.modules['__main__'], '__file__'):
74 main_path = os.path.basename(sys.modules['__main__'].__file__)
75 else:
76 main_path = None
77 data = pickle.dumps((target, args, kwargs))
78 p = self._RemoteProcess(data, main_path)
79 if name is None:
80 temp = self._name.split('Host-')[-1] + '/Process-%s'
81 name = temp % ':'.join(map(str, p.get_identity()))
82 p.set_name(name)
83 return p
84
85 @classmethod
86 def from_address(cls, address, authkey):
87 manager = cls(address, authkey)
88 managers.transact(address, authkey, 'dummy')
89 manager._state.value = managers.State.STARTED
90 manager._name = 'Host-%s:%s' % manager.address
91 manager.shutdown = util.Finalize(
92 manager, HostManager._finalize_host,
93 args=(manager._address, manager._authkey, manager._name),
94 exitpriority=-10
95 )
96 return manager
97
98 @staticmethod
99 def _finalize_host(address, authkey, name):
100 managers.transact(address, authkey, 'shutdown')
101
102 def __repr__(self):
103 return '<Host(%s)>' % self._name
104
105#
106# Process subclass representing a process on (possibly) a remote machine
107#
108
109class RemoteProcess(Process):
110 '''
111 Represents a process started on a remote host
112 '''
113 def __init__(self, data, main_path):
114 assert not main_path or os.path.basename(main_path) == main_path
115 Process.__init__(self)
116 self._data = data
117 self._main_path = main_path
118
119 def _bootstrap(self):
120 forking.prepare({'main_path': self._main_path})
121 self._target, self._args, self._kwargs = pickle.loads(self._data)
122 return Process._bootstrap(self)
123
124 def get_identity(self):
125 return self._identity
126
127HostManager.register('_RemoteProcess', RemoteProcess)
128
129#
130# A Pool class that uses a cluster
131#
132
133class DistributedPool(pool.Pool):
134
135 def __init__(self, cluster, processes=None, initializer=None, initargs=()):
136 self._cluster = cluster
137 self.Process = cluster.Process
138 pool.Pool.__init__(self, processes or len(cluster),
139 initializer, initargs)
140
141 def _setup_queues(self):
142 self._inqueue = self._cluster._SettableQueue()
143 self._outqueue = self._cluster._SettableQueue()
144 self._quick_put = self._inqueue.put
145 self._quick_get = self._outqueue.get
146
147 @staticmethod
148 def _help_stuff_finish(inqueue, task_handler, size):
149 inqueue.set_contents([None] * size)
150
151#
152# Manager type which starts host managers on other machines
153#
154
155def LocalProcess(**kwds):
156 p = Process(**kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000157 p.set_name('localhost/' + p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000158 return p
159
160class Cluster(managers.SyncManager):
161 '''
162 Represents collection of slots running on various hosts.
163
164 `Cluster` is a subclass of `SyncManager` so it allows creation of
165 various types of shared objects.
166 '''
167 def __init__(self, hostlist, modules):
168 managers.SyncManager.__init__(self, address=('localhost', 0))
169 self._hostlist = hostlist
170 self._modules = modules
171 if __name__ not in modules:
172 modules.append(__name__)
173 files = [sys.modules[name].__file__ for name in modules]
174 for i, file in enumerate(files):
175 if file.endswith('.pyc') or file.endswith('.pyo'):
176 files[i] = file[:-4] + '.py'
177 self._files = [os.path.abspath(file) for file in files]
178
179 def start(self):
180 managers.SyncManager.start(self)
181
182 l = connection.Listener(family='AF_INET', authkey=self._authkey)
183
184 for i, host in enumerate(self._hostlist):
185 host._start_manager(i, self._authkey, l.address, self._files)
186
187 for host in self._hostlist:
188 if host.hostname != 'localhost':
189 conn = l.accept()
190 i, address, cpus = conn.recv()
191 conn.close()
192 other_host = self._hostlist[i]
193 other_host.manager = HostManager.from_address(address,
194 self._authkey)
195 other_host.slots = other_host.slots or cpus
196 other_host.Process = other_host.manager.Process
197 else:
198 host.slots = host.slots or slot_count
199 host.Process = LocalProcess
200
201 self._slotlist = [
202 Slot(host) for host in self._hostlist for i in range(host.slots)
203 ]
204 self._slot_iterator = itertools.cycle(self._slotlist)
205 self._base_shutdown = self.shutdown
206 del self.shutdown
207
208 def shutdown(self):
209 for host in self._hostlist:
210 if host.hostname != 'localhost':
211 host.manager.shutdown()
212 self._base_shutdown()
213
214 def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000215 slot = next(self._slot_iterator)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216 return slot.Process(
217 group=group, target=target, name=name, args=args, kwargs=kwargs
218 )
219
220 def Pool(self, processes=None, initializer=None, initargs=()):
221 return DistributedPool(self, processes, initializer, initargs)
222
223 def __getitem__(self, i):
224 return self._slotlist[i]
225
226 def __len__(self):
227 return len(self._slotlist)
228
229 def __iter__(self):
230 return iter(self._slotlist)
231
232#
233# Queue subclass used by distributed pool
234#
235
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000236class SettableQueue(queue.Queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000237 def empty(self):
238 return not self.queue
239 def full(self):
240 return self.maxsize > 0 and len(self.queue) == self.maxsize
241 def set_contents(self, contents):
242 # length of contents must be at least as large as the number of
243 # threads which have potentially called get()
244 self.not_empty.acquire()
245 try:
246 self.queue.clear()
247 self.queue.extend(contents)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000248 self.not_empty.notifyAll()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249 finally:
250 self.not_empty.release()
251
252Cluster.register('_SettableQueue', SettableQueue)
253
254#
255# Class representing a notional cpu in the cluster
256#
257
258class Slot(object):
259 def __init__(self, host):
260 self.host = host
261 self.Process = host.Process
262
263#
264# Host
265#
266
267class Host(object):
268 '''
269 Represents a host to use as a node in a cluster.
270
271 `hostname` gives the name of the host. If hostname is not
272 "localhost" then ssh is used to log in to the host. To log in as
273 a different user use a host name of the form
274 "username@somewhere.org"
275
276 `slots` is used to specify the number of slots for processes on
277 the host. This affects how often processes will be allocated to
278 this host. Normally this should be equal to the number of cpus on
279 that host.
280 '''
281 def __init__(self, hostname, slots=None):
282 self.hostname = hostname
283 self.slots = slots
284
285 def _start_manager(self, index, authkey, address, files):
286 if self.hostname != 'localhost':
287 tempdir = copy_to_remote_temporary_directory(self.hostname, files)
288 debug('startup files copied to %s:%s', self.hostname, tempdir)
289 p = subprocess.Popen(
290 ['ssh', self.hostname, 'python', '-c',
291 '"import os; os.chdir(%r); '
292 'from distributing import main; main()"' % tempdir],
293 stdin=subprocess.PIPE
294 )
295 data = dict(
296 name='BoostrappingHost', index=index,
297 dist_log_level=_logger.getEffectiveLevel(),
298 dir=tempdir, authkey=str(authkey), parent_address=address
299 )
300 pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
301 p.stdin.close()
302
303#
304# Copy files to remote directory, returning name of directory
305#
306
307unzip_code = '''"
308import tempfile, os, sys, tarfile
309tempdir = tempfile.mkdtemp(prefix='distrib-')
310os.chdir(tempdir)
311tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
312for ti in tf:
313 tf.extract(ti)
314print tempdir
315"'''
316
317def copy_to_remote_temporary_directory(host, files):
318 p = subprocess.Popen(
319 ['ssh', host, 'python', '-c', unzip_code],
320 stdout=subprocess.PIPE, stdin=subprocess.PIPE
321 )
322 tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
323 for name in files:
324 tf.add(name, os.path.basename(name))
325 tf.close()
326 p.stdin.close()
327 return p.stdout.read().rstrip()
328
329#
330# Code which runs a host manager
331#
332
333def main():
334 # get data from parent over stdin
335 data = pickle.load(sys.stdin)
336 sys.stdin.close()
337
338 # set some stuff
339 _logger.setLevel(data['dist_log_level'])
340 forking.prepare(data)
341
342 # create server for a `HostManager` object
343 server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
344 current_process()._server = server
345
346 # report server address and number of cpus back to parent
347 conn = connection.Client(data['parent_address'], authkey=data['authkey'])
348 conn.send((data['index'], server.address, slot_count))
349 conn.close()
350
351 # set name etc
352 current_process().set_name('Host-%s:%s' % server.address)
353 util._run_after_forkers()
354
355 # register a cleanup function
356 def cleanup(directory):
357 debug('removing directory %s', directory)
358 shutil.rmtree(directory)
359 debug('shutting down host manager')
360 util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
361
362 # start host manager
363 debug('remote host manager starting in %s', data['dir'])
364 server.serve_forever()