blob: 4e8e52a57b88af967f825b32b4d69192024be5cf [file] [log] [blame]
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001#
2# Module to allow spawning of processes on foreign host
3#
4# Depends on `multiprocessing` package -- tested with `processing-0.60`
5#
6
7__all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
8
9#
10# Imports
11#
12
13import sys
14import os
15import tarfile
16import shutil
17import subprocess
18import logging
19import itertools
20import Queue
21
22try:
23 import cPickle as pickle
24except ImportError:
25 import pickle
26
27from multiprocessing import Process, current_process, cpu_count
28from multiprocessing import util, managers, connection, forking, pool
29
30#
31# Logging
32#
33
34def get_logger():
35 return _logger
36
37_logger = logging.getLogger('distributing')
38_logger.propogate = 0
39
40util.fix_up_logger(_logger)
41_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
42_handler = logging.StreamHandler()
43_handler.setFormatter(_formatter)
44_logger.addHandler(_handler)
45
46info = _logger.info
47debug = _logger.debug
48
49#
50# Get number of cpus
51#
52
53try:
54 slot_count = cpu_count()
55except NotImplemented:
56 slot_count = 1
57
58#
59# Manager type which spawns subprocesses
60#
61
62class HostManager(managers.SyncManager):
63 '''
64 Manager type used for spawning processes on a (presumably) foreign host
65 '''
66 def __init__(self, address, authkey):
67 managers.SyncManager.__init__(self, address, authkey)
68 self._name = 'Host-unknown'
69
70 def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
71 if hasattr(sys.modules['__main__'], '__file__'):
72 main_path = os.path.basename(sys.modules['__main__'].__file__)
73 else:
74 main_path = None
75 data = pickle.dumps((target, args, kwargs))
76 p = self._RemoteProcess(data, main_path)
77 if name is None:
78 temp = self._name.split('Host-')[-1] + '/Process-%s'
79 name = temp % ':'.join(map(str, p.get_identity()))
80 p.set_name(name)
81 return p
82
83 @classmethod
84 def from_address(cls, address, authkey):
85 manager = cls(address, authkey)
86 managers.transact(address, authkey, 'dummy')
87 manager._state.value = managers.State.STARTED
88 manager._name = 'Host-%s:%s' % manager.address
89 manager.shutdown = util.Finalize(
90 manager, HostManager._finalize_host,
91 args=(manager._address, manager._authkey, manager._name),
92 exitpriority=-10
93 )
94 return manager
95
96 @staticmethod
97 def _finalize_host(address, authkey, name):
98 managers.transact(address, authkey, 'shutdown')
99
100 def __repr__(self):
101 return '<Host(%s)>' % self._name
102
103#
104# Process subclass representing a process on (possibly) a remote machine
105#
106
107class RemoteProcess(Process):
108 '''
109 Represents a process started on a remote host
110 '''
111 def __init__(self, data, main_path):
112 assert not main_path or os.path.basename(main_path) == main_path
113 Process.__init__(self)
114 self._data = data
115 self._main_path = main_path
116
117 def _bootstrap(self):
118 forking.prepare({'main_path': self._main_path})
119 self._target, self._args, self._kwargs = pickle.loads(self._data)
120 return Process._bootstrap(self)
121
122 def get_identity(self):
123 return self._identity
124
125HostManager.register('_RemoteProcess', RemoteProcess)
126
127#
128# A Pool class that uses a cluster
129#
130
131class DistributedPool(pool.Pool):
132
133 def __init__(self, cluster, processes=None, initializer=None, initargs=()):
134 self._cluster = cluster
135 self.Process = cluster.Process
136 pool.Pool.__init__(self, processes or len(cluster),
137 initializer, initargs)
138
139 def _setup_queues(self):
140 self._inqueue = self._cluster._SettableQueue()
141 self._outqueue = self._cluster._SettableQueue()
142 self._quick_put = self._inqueue.put
143 self._quick_get = self._outqueue.get
144
145 @staticmethod
146 def _help_stuff_finish(inqueue, task_handler, size):
147 inqueue.set_contents([None] * size)
148
149#
150# Manager type which starts host managers on other machines
151#
152
153def LocalProcess(**kwds):
154 p = Process(**kwds)
155 p.set_name('localhost/' + p.get_name())
156 return p
157
158class Cluster(managers.SyncManager):
159 '''
160 Represents collection of slots running on various hosts.
161
162 `Cluster` is a subclass of `SyncManager` so it allows creation of
163 various types of shared objects.
164 '''
165 def __init__(self, hostlist, modules):
166 managers.SyncManager.__init__(self, address=('localhost', 0))
167 self._hostlist = hostlist
168 self._modules = modules
169 if __name__ not in modules:
170 modules.append(__name__)
171 files = [sys.modules[name].__file__ for name in modules]
172 for i, file in enumerate(files):
173 if file.endswith('.pyc') or file.endswith('.pyo'):
174 files[i] = file[:-4] + '.py'
175 self._files = [os.path.abspath(file) for file in files]
176
177 def start(self):
178 managers.SyncManager.start(self)
179
180 l = connection.Listener(family='AF_INET', authkey=self._authkey)
181
182 for i, host in enumerate(self._hostlist):
183 host._start_manager(i, self._authkey, l.address, self._files)
184
185 for host in self._hostlist:
186 if host.hostname != 'localhost':
187 conn = l.accept()
188 i, address, cpus = conn.recv()
189 conn.close()
190 other_host = self._hostlist[i]
191 other_host.manager = HostManager.from_address(address,
192 self._authkey)
193 other_host.slots = other_host.slots or cpus
194 other_host.Process = other_host.manager.Process
195 else:
196 host.slots = host.slots or slot_count
197 host.Process = LocalProcess
198
199 self._slotlist = [
200 Slot(host) for host in self._hostlist for i in range(host.slots)
201 ]
202 self._slot_iterator = itertools.cycle(self._slotlist)
203 self._base_shutdown = self.shutdown
204 del self.shutdown
205
206 def shutdown(self):
207 for host in self._hostlist:
208 if host.hostname != 'localhost':
209 host.manager.shutdown()
210 self._base_shutdown()
211
212 def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
213 slot = self._slot_iterator.next()
214 return slot.Process(
215 group=group, target=target, name=name, args=args, kwargs=kwargs
216 )
217
218 def Pool(self, processes=None, initializer=None, initargs=()):
219 return DistributedPool(self, processes, initializer, initargs)
220
221 def __getitem__(self, i):
222 return self._slotlist[i]
223
224 def __len__(self):
225 return len(self._slotlist)
226
227 def __iter__(self):
228 return iter(self._slotlist)
229
230#
231# Queue subclass used by distributed pool
232#
233
234class SettableQueue(Queue.Queue):
235 def empty(self):
236 return not self.queue
237 def full(self):
238 return self.maxsize > 0 and len(self.queue) == self.maxsize
239 def set_contents(self, contents):
240 # length of contents must be at least as large as the number of
241 # threads which have potentially called get()
242 self.not_empty.acquire()
243 try:
244 self.queue.clear()
245 self.queue.extend(contents)
246 self.not_empty.notifyAll()
247 finally:
248 self.not_empty.release()
249
250Cluster.register('_SettableQueue', SettableQueue)
251
252#
253# Class representing a notional cpu in the cluster
254#
255
256class Slot(object):
257 def __init__(self, host):
258 self.host = host
259 self.Process = host.Process
260
261#
262# Host
263#
264
265class Host(object):
266 '''
267 Represents a host to use as a node in a cluster.
268
269 `hostname` gives the name of the host. If hostname is not
270 "localhost" then ssh is used to log in to the host. To log in as
271 a different user use a host name of the form
272 "username@somewhere.org"
273
274 `slots` is used to specify the number of slots for processes on
275 the host. This affects how often processes will be allocated to
276 this host. Normally this should be equal to the number of cpus on
277 that host.
278 '''
279 def __init__(self, hostname, slots=None):
280 self.hostname = hostname
281 self.slots = slots
282
283 def _start_manager(self, index, authkey, address, files):
284 if self.hostname != 'localhost':
285 tempdir = copy_to_remote_temporary_directory(self.hostname, files)
286 debug('startup files copied to %s:%s', self.hostname, tempdir)
287 p = subprocess.Popen(
288 ['ssh', self.hostname, 'python', '-c',
289 '"import os; os.chdir(%r); '
290 'from distributing import main; main()"' % tempdir],
291 stdin=subprocess.PIPE
292 )
293 data = dict(
294 name='BoostrappingHost', index=index,
295 dist_log_level=_logger.getEffectiveLevel(),
296 dir=tempdir, authkey=str(authkey), parent_address=address
297 )
298 pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
299 p.stdin.close()
300
301#
302# Copy files to remote directory, returning name of directory
303#
304
305unzip_code = '''"
306import tempfile, os, sys, tarfile
307tempdir = tempfile.mkdtemp(prefix='distrib-')
308os.chdir(tempdir)
309tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
310for ti in tf:
311 tf.extract(ti)
312print tempdir
313"'''
314
315def copy_to_remote_temporary_directory(host, files):
316 p = subprocess.Popen(
317 ['ssh', host, 'python', '-c', unzip_code],
318 stdout=subprocess.PIPE, stdin=subprocess.PIPE
319 )
320 tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
321 for name in files:
322 tf.add(name, os.path.basename(name))
323 tf.close()
324 p.stdin.close()
325 return p.stdout.read().rstrip()
326
327#
328# Code which runs a host manager
329#
330
331def main():
332 # get data from parent over stdin
333 data = pickle.load(sys.stdin)
334 sys.stdin.close()
335
336 # set some stuff
337 _logger.setLevel(data['dist_log_level'])
338 forking.prepare(data)
339
340 # create server for a `HostManager` object
341 server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
342 current_process()._server = server
343
344 # report server address and number of cpus back to parent
345 conn = connection.Client(data['parent_address'], authkey=data['authkey'])
346 conn.send((data['index'], server.address, slot_count))
347 conn.close()
348
349 # set name etc
350 current_process().set_name('Host-%s:%s' % server.address)
351 util._run_after_forkers()
352
353 # register a cleanup function
354 def cleanup(directory):
355 debug('removing directory %s', directory)
356 shutil.rmtree(directory)
357 debug('shutting down host manager')
358 util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
359
360 # start host manager
361 debug('remote host manager starting in %s', data['dir'])
362 server.serve_forever()