| Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 1 | # | 
|  | 2 | # Module to allow spawning of processes on foreign host | 
|  | 3 | # | 
|  | 4 | # Depends on `multiprocessing` package -- tested with `processing-0.60` | 
|  | 5 | # | 
| Christian Heimes | eac8071 | 2008-11-28 19:33:33 +0000 | [diff] [blame] | 6 | # Copyright (c) 2006-2008, R Oudkerk | 
|  | 7 | # All rights reserved. | 
|  | 8 | # | 
| Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 9 |  | 
|  | 10 | __all__ = ['Cluster', 'Host', 'get_logger', 'current_process'] | 
|  | 11 |  | 
|  | 12 | # | 
|  | 13 | # Imports | 
|  | 14 | # | 
|  | 15 |  | 
|  | 16 | import sys | 
|  | 17 | import os | 
|  | 18 | import tarfile | 
|  | 19 | import shutil | 
|  | 20 | import subprocess | 
|  | 21 | import logging | 
|  | 22 | import itertools | 
|  | 23 | import Queue | 
|  | 24 |  | 
|  | 25 | try: | 
|  | 26 | import cPickle as pickle | 
|  | 27 | except ImportError: | 
|  | 28 | import pickle | 
|  | 29 |  | 
|  | 30 | from multiprocessing import Process, current_process, cpu_count | 
|  | 31 | from multiprocessing import util, managers, connection, forking, pool | 
|  | 32 |  | 
|  | 33 | # | 
|  | 34 | # Logging | 
|  | 35 | # | 
|  | 36 |  | 
|  | 37 | def get_logger(): | 
|  | 38 | return _logger | 
|  | 39 |  | 
|  | 40 | _logger = logging.getLogger('distributing') | 
|  | 41 | _logger.propogate = 0 | 
|  | 42 |  | 
| Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 43 | _formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT) | 
|  | 44 | _handler = logging.StreamHandler() | 
|  | 45 | _handler.setFormatter(_formatter) | 
|  | 46 | _logger.addHandler(_handler) | 
|  | 47 |  | 
|  | 48 | info = _logger.info | 
|  | 49 | debug = _logger.debug | 
|  | 50 |  | 
|  | 51 | # | 
|  | 52 | # Get number of cpus | 
|  | 53 | # | 
|  | 54 |  | 
|  | 55 | try: | 
|  | 56 | slot_count = cpu_count() | 
|  | 57 | except NotImplemented: | 
|  | 58 | slot_count = 1 | 
|  | 59 |  | 
|  | 60 | # | 
|  | 61 | # Manager type which spawns subprocesses | 
|  | 62 | # | 
|  | 63 |  | 
|  | 64 | class HostManager(managers.SyncManager): | 
|  | 65 | ''' | 
|  | 66 | Manager type used for spawning processes on a (presumably) foreign host | 
|  | 67 | ''' | 
|  | 68 | def __init__(self, address, authkey): | 
|  | 69 | managers.SyncManager.__init__(self, address, authkey) | 
|  | 70 | self._name = 'Host-unknown' | 
|  | 71 |  | 
|  | 72 | def Process(self, group=None, target=None, name=None, args=(), kwargs={}): | 
|  | 73 | if hasattr(sys.modules['__main__'], '__file__'): | 
|  | 74 | main_path = os.path.basename(sys.modules['__main__'].__file__) | 
|  | 75 | else: | 
|  | 76 | main_path = None | 
|  | 77 | data = pickle.dumps((target, args, kwargs)) | 
|  | 78 | p = self._RemoteProcess(data, main_path) | 
|  | 79 | if name is None: | 
|  | 80 | temp = self._name.split('Host-')[-1] + '/Process-%s' | 
|  | 81 | name = temp % ':'.join(map(str, p.get_identity())) | 
|  | 82 | p.set_name(name) | 
|  | 83 | return p | 
|  | 84 |  | 
|  | 85 | @classmethod | 
|  | 86 | def from_address(cls, address, authkey): | 
|  | 87 | manager = cls(address, authkey) | 
|  | 88 | managers.transact(address, authkey, 'dummy') | 
|  | 89 | manager._state.value = managers.State.STARTED | 
|  | 90 | manager._name = 'Host-%s:%s' % manager.address | 
|  | 91 | manager.shutdown = util.Finalize( | 
|  | 92 | manager, HostManager._finalize_host, | 
|  | 93 | args=(manager._address, manager._authkey, manager._name), | 
|  | 94 | exitpriority=-10 | 
|  | 95 | ) | 
|  | 96 | return manager | 
|  | 97 |  | 
|  | 98 | @staticmethod | 
|  | 99 | def _finalize_host(address, authkey, name): | 
|  | 100 | managers.transact(address, authkey, 'shutdown') | 
|  | 101 |  | 
|  | 102 | def __repr__(self): | 
|  | 103 | return '<Host(%s)>' % self._name | 
|  | 104 |  | 
|  | 105 | # | 
|  | 106 | # Process subclass representing a process on (possibly) a remote machine | 
|  | 107 | # | 
|  | 108 |  | 
|  | 109 | class RemoteProcess(Process): | 
|  | 110 | ''' | 
|  | 111 | Represents a process started on a remote host | 
|  | 112 | ''' | 
|  | 113 | def __init__(self, data, main_path): | 
|  | 114 | assert not main_path or os.path.basename(main_path) == main_path | 
|  | 115 | Process.__init__(self) | 
|  | 116 | self._data = data | 
|  | 117 | self._main_path = main_path | 
|  | 118 |  | 
|  | 119 | def _bootstrap(self): | 
|  | 120 | forking.prepare({'main_path': self._main_path}) | 
|  | 121 | self._target, self._args, self._kwargs = pickle.loads(self._data) | 
|  | 122 | return Process._bootstrap(self) | 
|  | 123 |  | 
|  | 124 | def get_identity(self): | 
|  | 125 | return self._identity | 
|  | 126 |  | 
|  | 127 | HostManager.register('_RemoteProcess', RemoteProcess) | 
|  | 128 |  | 
|  | 129 | # | 
|  | 130 | # A Pool class that uses a cluster | 
|  | 131 | # | 
|  | 132 |  | 
|  | 133 | class DistributedPool(pool.Pool): | 
|  | 134 |  | 
|  | 135 | def __init__(self, cluster, processes=None, initializer=None, initargs=()): | 
|  | 136 | self._cluster = cluster | 
|  | 137 | self.Process = cluster.Process | 
|  | 138 | pool.Pool.__init__(self, processes or len(cluster), | 
|  | 139 | initializer, initargs) | 
|  | 140 |  | 
|  | 141 | def _setup_queues(self): | 
|  | 142 | self._inqueue = self._cluster._SettableQueue() | 
|  | 143 | self._outqueue = self._cluster._SettableQueue() | 
|  | 144 | self._quick_put = self._inqueue.put | 
|  | 145 | self._quick_get = self._outqueue.get | 
|  | 146 |  | 
|  | 147 | @staticmethod | 
|  | 148 | def _help_stuff_finish(inqueue, task_handler, size): | 
|  | 149 | inqueue.set_contents([None] * size) | 
|  | 150 |  | 
|  | 151 | # | 
|  | 152 | # Manager type which starts host managers on other machines | 
|  | 153 | # | 
|  | 154 |  | 
|  | 155 | def LocalProcess(**kwds): | 
|  | 156 | p = Process(**kwds) | 
| Jesse Noller | 5bc9f4c | 2008-08-19 19:06:19 +0000 | [diff] [blame] | 157 | p.set_name('localhost/' + p.name) | 
| Benjamin Peterson | 190d56e | 2008-06-11 02:40:25 +0000 | [diff] [blame] | 158 | return p | 
|  | 159 |  | 
|  | 160 | class Cluster(managers.SyncManager): | 
|  | 161 | ''' | 
|  | 162 | Represents collection of slots running on various hosts. | 
|  | 163 |  | 
|  | 164 | `Cluster` is a subclass of `SyncManager` so it allows creation of | 
|  | 165 | various types of shared objects. | 
|  | 166 | ''' | 
|  | 167 | def __init__(self, hostlist, modules): | 
|  | 168 | managers.SyncManager.__init__(self, address=('localhost', 0)) | 
|  | 169 | self._hostlist = hostlist | 
|  | 170 | self._modules = modules | 
|  | 171 | if __name__ not in modules: | 
|  | 172 | modules.append(__name__) | 
|  | 173 | files = [sys.modules[name].__file__ for name in modules] | 
|  | 174 | for i, file in enumerate(files): | 
|  | 175 | if file.endswith('.pyc') or file.endswith('.pyo'): | 
|  | 176 | files[i] = file[:-4] + '.py' | 
|  | 177 | self._files = [os.path.abspath(file) for file in files] | 
|  | 178 |  | 
|  | 179 | def start(self): | 
|  | 180 | managers.SyncManager.start(self) | 
|  | 181 |  | 
|  | 182 | l = connection.Listener(family='AF_INET', authkey=self._authkey) | 
|  | 183 |  | 
|  | 184 | for i, host in enumerate(self._hostlist): | 
|  | 185 | host._start_manager(i, self._authkey, l.address, self._files) | 
|  | 186 |  | 
|  | 187 | for host in self._hostlist: | 
|  | 188 | if host.hostname != 'localhost': | 
|  | 189 | conn = l.accept() | 
|  | 190 | i, address, cpus = conn.recv() | 
|  | 191 | conn.close() | 
|  | 192 | other_host = self._hostlist[i] | 
|  | 193 | other_host.manager = HostManager.from_address(address, | 
|  | 194 | self._authkey) | 
|  | 195 | other_host.slots = other_host.slots or cpus | 
|  | 196 | other_host.Process = other_host.manager.Process | 
|  | 197 | else: | 
|  | 198 | host.slots = host.slots or slot_count | 
|  | 199 | host.Process = LocalProcess | 
|  | 200 |  | 
|  | 201 | self._slotlist = [ | 
|  | 202 | Slot(host) for host in self._hostlist for i in range(host.slots) | 
|  | 203 | ] | 
|  | 204 | self._slot_iterator = itertools.cycle(self._slotlist) | 
|  | 205 | self._base_shutdown = self.shutdown | 
|  | 206 | del self.shutdown | 
|  | 207 |  | 
|  | 208 | def shutdown(self): | 
|  | 209 | for host in self._hostlist: | 
|  | 210 | if host.hostname != 'localhost': | 
|  | 211 | host.manager.shutdown() | 
|  | 212 | self._base_shutdown() | 
|  | 213 |  | 
|  | 214 | def Process(self, group=None, target=None, name=None, args=(), kwargs={}): | 
|  | 215 | slot = self._slot_iterator.next() | 
|  | 216 | return slot.Process( | 
|  | 217 | group=group, target=target, name=name, args=args, kwargs=kwargs | 
|  | 218 | ) | 
|  | 219 |  | 
|  | 220 | def Pool(self, processes=None, initializer=None, initargs=()): | 
|  | 221 | return DistributedPool(self, processes, initializer, initargs) | 
|  | 222 |  | 
|  | 223 | def __getitem__(self, i): | 
|  | 224 | return self._slotlist[i] | 
|  | 225 |  | 
|  | 226 | def __len__(self): | 
|  | 227 | return len(self._slotlist) | 
|  | 228 |  | 
|  | 229 | def __iter__(self): | 
|  | 230 | return iter(self._slotlist) | 
|  | 231 |  | 
|  | 232 | # | 
|  | 233 | # Queue subclass used by distributed pool | 
|  | 234 | # | 
|  | 235 |  | 
|  | 236 | class SettableQueue(Queue.Queue): | 
|  | 237 | def empty(self): | 
|  | 238 | return not self.queue | 
|  | 239 | def full(self): | 
|  | 240 | return self.maxsize > 0 and len(self.queue) == self.maxsize | 
|  | 241 | def set_contents(self, contents): | 
|  | 242 | # length of contents must be at least as large as the number of | 
|  | 243 | # threads which have potentially called get() | 
|  | 244 | self.not_empty.acquire() | 
|  | 245 | try: | 
|  | 246 | self.queue.clear() | 
|  | 247 | self.queue.extend(contents) | 
|  | 248 | self.not_empty.notifyAll() | 
|  | 249 | finally: | 
|  | 250 | self.not_empty.release() | 
|  | 251 |  | 
|  | 252 | Cluster.register('_SettableQueue', SettableQueue) | 
|  | 253 |  | 
|  | 254 | # | 
|  | 255 | # Class representing a notional cpu in the cluster | 
|  | 256 | # | 
|  | 257 |  | 
|  | 258 | class Slot(object): | 
|  | 259 | def __init__(self, host): | 
|  | 260 | self.host = host | 
|  | 261 | self.Process = host.Process | 
|  | 262 |  | 
|  | 263 | # | 
|  | 264 | # Host | 
|  | 265 | # | 
|  | 266 |  | 
|  | 267 | class Host(object): | 
|  | 268 | ''' | 
|  | 269 | Represents a host to use as a node in a cluster. | 
|  | 270 |  | 
|  | 271 | `hostname` gives the name of the host.  If hostname is not | 
|  | 272 | "localhost" then ssh is used to log in to the host.  To log in as | 
|  | 273 | a different user use a host name of the form | 
|  | 274 | "username@somewhere.org" | 
|  | 275 |  | 
|  | 276 | `slots` is used to specify the number of slots for processes on | 
|  | 277 | the host.  This affects how often processes will be allocated to | 
|  | 278 | this host.  Normally this should be equal to the number of cpus on | 
|  | 279 | that host. | 
|  | 280 | ''' | 
|  | 281 | def __init__(self, hostname, slots=None): | 
|  | 282 | self.hostname = hostname | 
|  | 283 | self.slots = slots | 
|  | 284 |  | 
|  | 285 | def _start_manager(self, index, authkey, address, files): | 
|  | 286 | if self.hostname != 'localhost': | 
|  | 287 | tempdir = copy_to_remote_temporary_directory(self.hostname, files) | 
|  | 288 | debug('startup files copied to %s:%s', self.hostname, tempdir) | 
|  | 289 | p = subprocess.Popen( | 
|  | 290 | ['ssh', self.hostname, 'python', '-c', | 
|  | 291 | '"import os; os.chdir(%r); ' | 
|  | 292 | 'from distributing import main; main()"' % tempdir], | 
|  | 293 | stdin=subprocess.PIPE | 
|  | 294 | ) | 
|  | 295 | data = dict( | 
|  | 296 | name='BoostrappingHost', index=index, | 
|  | 297 | dist_log_level=_logger.getEffectiveLevel(), | 
|  | 298 | dir=tempdir, authkey=str(authkey), parent_address=address | 
|  | 299 | ) | 
|  | 300 | pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL) | 
|  | 301 | p.stdin.close() | 
|  | 302 |  | 
|  | 303 | # | 
|  | 304 | # Copy files to remote directory, returning name of directory | 
|  | 305 | # | 
|  | 306 |  | 
|  | 307 | unzip_code = '''" | 
|  | 308 | import tempfile, os, sys, tarfile | 
|  | 309 | tempdir = tempfile.mkdtemp(prefix='distrib-') | 
|  | 310 | os.chdir(tempdir) | 
|  | 311 | tf = tarfile.open(fileobj=sys.stdin, mode='r|gz') | 
|  | 312 | for ti in tf: | 
|  | 313 | tf.extract(ti) | 
|  | 314 | print tempdir | 
|  | 315 | "''' | 
|  | 316 |  | 
|  | 317 | def copy_to_remote_temporary_directory(host, files): | 
|  | 318 | p = subprocess.Popen( | 
|  | 319 | ['ssh', host, 'python', '-c', unzip_code], | 
|  | 320 | stdout=subprocess.PIPE, stdin=subprocess.PIPE | 
|  | 321 | ) | 
|  | 322 | tf = tarfile.open(fileobj=p.stdin, mode='w|gz') | 
|  | 323 | for name in files: | 
|  | 324 | tf.add(name, os.path.basename(name)) | 
|  | 325 | tf.close() | 
|  | 326 | p.stdin.close() | 
|  | 327 | return p.stdout.read().rstrip() | 
|  | 328 |  | 
|  | 329 | # | 
|  | 330 | # Code which runs a host manager | 
|  | 331 | # | 
|  | 332 |  | 
|  | 333 | def main(): | 
|  | 334 | # get data from parent over stdin | 
|  | 335 | data = pickle.load(sys.stdin) | 
|  | 336 | sys.stdin.close() | 
|  | 337 |  | 
|  | 338 | # set some stuff | 
|  | 339 | _logger.setLevel(data['dist_log_level']) | 
|  | 340 | forking.prepare(data) | 
|  | 341 |  | 
|  | 342 | # create server for a `HostManager` object | 
|  | 343 | server = managers.Server(HostManager._registry, ('', 0), data['authkey']) | 
|  | 344 | current_process()._server = server | 
|  | 345 |  | 
|  | 346 | # report server address and number of cpus back to parent | 
|  | 347 | conn = connection.Client(data['parent_address'], authkey=data['authkey']) | 
|  | 348 | conn.send((data['index'], server.address, slot_count)) | 
|  | 349 | conn.close() | 
|  | 350 |  | 
|  | 351 | # set name etc | 
|  | 352 | current_process().set_name('Host-%s:%s' % server.address) | 
|  | 353 | util._run_after_forkers() | 
|  | 354 |  | 
|  | 355 | # register a cleanup function | 
|  | 356 | def cleanup(directory): | 
|  | 357 | debug('removing directory %s', directory) | 
|  | 358 | shutil.rmtree(directory) | 
|  | 359 | debug('shutting down host manager') | 
|  | 360 | util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0) | 
|  | 361 |  | 
|  | 362 | # start host manager | 
|  | 363 | debug('remote host manager starting in %s', data['dir']) | 
|  | 364 | server.serve_forever() |