blob: 3e9ffd65ea2aa7d186b01709c815758ab883ed39 [file] [log] [blame]
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001#
2# Module to allow spawning of processes on foreign host
3#
4# Depends on `multiprocessing` package -- tested with `processing-0.60`
5#
6
7__all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
8
9#
10# Imports
11#
12
13import sys
14import os
15import tarfile
16import shutil
17import subprocess
18import logging
19import itertools
20import Queue
21
22try:
23 import cPickle as pickle
24except ImportError:
25 import pickle
26
27from multiprocessing import Process, current_process, cpu_count
28from multiprocessing import util, managers, connection, forking, pool
29
30#
31# Logging
32#
33
34def get_logger():
35 return _logger
36
37_logger = logging.getLogger('distributing')
38_logger.propogate = 0
39
Benjamin Peterson190d56e2008-06-11 02:40:25 +000040_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
41_handler = logging.StreamHandler()
42_handler.setFormatter(_formatter)
43_logger.addHandler(_handler)
44
45info = _logger.info
46debug = _logger.debug
47
48#
49# Get number of cpus
50#
51
52try:
53 slot_count = cpu_count()
54except NotImplemented:
55 slot_count = 1
56
57#
58# Manager type which spawns subprocesses
59#
60
61class HostManager(managers.SyncManager):
62 '''
63 Manager type used for spawning processes on a (presumably) foreign host
64 '''
65 def __init__(self, address, authkey):
66 managers.SyncManager.__init__(self, address, authkey)
67 self._name = 'Host-unknown'
68
69 def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
70 if hasattr(sys.modules['__main__'], '__file__'):
71 main_path = os.path.basename(sys.modules['__main__'].__file__)
72 else:
73 main_path = None
74 data = pickle.dumps((target, args, kwargs))
75 p = self._RemoteProcess(data, main_path)
76 if name is None:
77 temp = self._name.split('Host-')[-1] + '/Process-%s'
78 name = temp % ':'.join(map(str, p.get_identity()))
79 p.set_name(name)
80 return p
81
82 @classmethod
83 def from_address(cls, address, authkey):
84 manager = cls(address, authkey)
85 managers.transact(address, authkey, 'dummy')
86 manager._state.value = managers.State.STARTED
87 manager._name = 'Host-%s:%s' % manager.address
88 manager.shutdown = util.Finalize(
89 manager, HostManager._finalize_host,
90 args=(manager._address, manager._authkey, manager._name),
91 exitpriority=-10
92 )
93 return manager
94
95 @staticmethod
96 def _finalize_host(address, authkey, name):
97 managers.transact(address, authkey, 'shutdown')
98
99 def __repr__(self):
100 return '<Host(%s)>' % self._name
101
102#
103# Process subclass representing a process on (possibly) a remote machine
104#
105
106class RemoteProcess(Process):
107 '''
108 Represents a process started on a remote host
109 '''
110 def __init__(self, data, main_path):
111 assert not main_path or os.path.basename(main_path) == main_path
112 Process.__init__(self)
113 self._data = data
114 self._main_path = main_path
115
116 def _bootstrap(self):
117 forking.prepare({'main_path': self._main_path})
118 self._target, self._args, self._kwargs = pickle.loads(self._data)
119 return Process._bootstrap(self)
120
121 def get_identity(self):
122 return self._identity
123
124HostManager.register('_RemoteProcess', RemoteProcess)
125
126#
127# A Pool class that uses a cluster
128#
129
130class DistributedPool(pool.Pool):
131
132 def __init__(self, cluster, processes=None, initializer=None, initargs=()):
133 self._cluster = cluster
134 self.Process = cluster.Process
135 pool.Pool.__init__(self, processes or len(cluster),
136 initializer, initargs)
137
138 def _setup_queues(self):
139 self._inqueue = self._cluster._SettableQueue()
140 self._outqueue = self._cluster._SettableQueue()
141 self._quick_put = self._inqueue.put
142 self._quick_get = self._outqueue.get
143
144 @staticmethod
145 def _help_stuff_finish(inqueue, task_handler, size):
146 inqueue.set_contents([None] * size)
147
148#
149# Manager type which starts host managers on other machines
150#
151
152def LocalProcess(**kwds):
153 p = Process(**kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000154 p.set_name('localhost/' + p.name)
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000155 return p
156
157class Cluster(managers.SyncManager):
158 '''
159 Represents collection of slots running on various hosts.
160
161 `Cluster` is a subclass of `SyncManager` so it allows creation of
162 various types of shared objects.
163 '''
164 def __init__(self, hostlist, modules):
165 managers.SyncManager.__init__(self, address=('localhost', 0))
166 self._hostlist = hostlist
167 self._modules = modules
168 if __name__ not in modules:
169 modules.append(__name__)
170 files = [sys.modules[name].__file__ for name in modules]
171 for i, file in enumerate(files):
172 if file.endswith('.pyc') or file.endswith('.pyo'):
173 files[i] = file[:-4] + '.py'
174 self._files = [os.path.abspath(file) for file in files]
175
176 def start(self):
177 managers.SyncManager.start(self)
178
179 l = connection.Listener(family='AF_INET', authkey=self._authkey)
180
181 for i, host in enumerate(self._hostlist):
182 host._start_manager(i, self._authkey, l.address, self._files)
183
184 for host in self._hostlist:
185 if host.hostname != 'localhost':
186 conn = l.accept()
187 i, address, cpus = conn.recv()
188 conn.close()
189 other_host = self._hostlist[i]
190 other_host.manager = HostManager.from_address(address,
191 self._authkey)
192 other_host.slots = other_host.slots or cpus
193 other_host.Process = other_host.manager.Process
194 else:
195 host.slots = host.slots or slot_count
196 host.Process = LocalProcess
197
198 self._slotlist = [
199 Slot(host) for host in self._hostlist for i in range(host.slots)
200 ]
201 self._slot_iterator = itertools.cycle(self._slotlist)
202 self._base_shutdown = self.shutdown
203 del self.shutdown
204
205 def shutdown(self):
206 for host in self._hostlist:
207 if host.hostname != 'localhost':
208 host.manager.shutdown()
209 self._base_shutdown()
210
211 def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
212 slot = self._slot_iterator.next()
213 return slot.Process(
214 group=group, target=target, name=name, args=args, kwargs=kwargs
215 )
216
217 def Pool(self, processes=None, initializer=None, initargs=()):
218 return DistributedPool(self, processes, initializer, initargs)
219
220 def __getitem__(self, i):
221 return self._slotlist[i]
222
223 def __len__(self):
224 return len(self._slotlist)
225
226 def __iter__(self):
227 return iter(self._slotlist)
228
229#
230# Queue subclass used by distributed pool
231#
232
233class SettableQueue(Queue.Queue):
234 def empty(self):
235 return not self.queue
236 def full(self):
237 return self.maxsize > 0 and len(self.queue) == self.maxsize
238 def set_contents(self, contents):
239 # length of contents must be at least as large as the number of
240 # threads which have potentially called get()
241 self.not_empty.acquire()
242 try:
243 self.queue.clear()
244 self.queue.extend(contents)
245 self.not_empty.notifyAll()
246 finally:
247 self.not_empty.release()
248
249Cluster.register('_SettableQueue', SettableQueue)
250
251#
252# Class representing a notional cpu in the cluster
253#
254
255class Slot(object):
256 def __init__(self, host):
257 self.host = host
258 self.Process = host.Process
259
260#
261# Host
262#
263
264class Host(object):
265 '''
266 Represents a host to use as a node in a cluster.
267
268 `hostname` gives the name of the host. If hostname is not
269 "localhost" then ssh is used to log in to the host. To log in as
270 a different user use a host name of the form
271 "username@somewhere.org"
272
273 `slots` is used to specify the number of slots for processes on
274 the host. This affects how often processes will be allocated to
275 this host. Normally this should be equal to the number of cpus on
276 that host.
277 '''
278 def __init__(self, hostname, slots=None):
279 self.hostname = hostname
280 self.slots = slots
281
282 def _start_manager(self, index, authkey, address, files):
283 if self.hostname != 'localhost':
284 tempdir = copy_to_remote_temporary_directory(self.hostname, files)
285 debug('startup files copied to %s:%s', self.hostname, tempdir)
286 p = subprocess.Popen(
287 ['ssh', self.hostname, 'python', '-c',
288 '"import os; os.chdir(%r); '
289 'from distributing import main; main()"' % tempdir],
290 stdin=subprocess.PIPE
291 )
292 data = dict(
293 name='BoostrappingHost', index=index,
294 dist_log_level=_logger.getEffectiveLevel(),
295 dir=tempdir, authkey=str(authkey), parent_address=address
296 )
297 pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
298 p.stdin.close()
299
300#
301# Copy files to remote directory, returning name of directory
302#
303
304unzip_code = '''"
305import tempfile, os, sys, tarfile
306tempdir = tempfile.mkdtemp(prefix='distrib-')
307os.chdir(tempdir)
308tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
309for ti in tf:
310 tf.extract(ti)
311print tempdir
312"'''
313
314def copy_to_remote_temporary_directory(host, files):
315 p = subprocess.Popen(
316 ['ssh', host, 'python', '-c', unzip_code],
317 stdout=subprocess.PIPE, stdin=subprocess.PIPE
318 )
319 tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
320 for name in files:
321 tf.add(name, os.path.basename(name))
322 tf.close()
323 p.stdin.close()
324 return p.stdout.read().rstrip()
325
326#
327# Code which runs a host manager
328#
329
330def main():
331 # get data from parent over stdin
332 data = pickle.load(sys.stdin)
333 sys.stdin.close()
334
335 # set some stuff
336 _logger.setLevel(data['dist_log_level'])
337 forking.prepare(data)
338
339 # create server for a `HostManager` object
340 server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
341 current_process()._server = server
342
343 # report server address and number of cpus back to parent
344 conn = connection.Client(data['parent_address'], authkey=data['authkey'])
345 conn.send((data['index'], server.address, slot_count))
346 conn.close()
347
348 # set name etc
349 current_process().set_name('Host-%s:%s' % server.address)
350 util._run_after_forkers()
351
352 # register a cleanup function
353 def cleanup(directory):
354 debug('removing directory %s', directory)
355 shutil.rmtree(directory)
356 debug('shutting down host manager')
357 util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
358
359 # start host manager
360 debug('remote host manager starting in %s', data['dir'])
361 server.serve_forever()