Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 1 | # |
| 2 | # Module to allow spawning of processes on foreign host |
| 3 | # |
| 4 | # Depends on `multiprocessing` package -- tested with `processing-0.60` |
| 5 | # |
| 6 | |
| 7 | __all__ = ['Cluster', 'Host', 'get_logger', 'current_process'] |
| 8 | |
| 9 | # |
| 10 | # Imports |
| 11 | # |
| 12 | |
| 13 | import sys |
| 14 | import os |
| 15 | import tarfile |
| 16 | import shutil |
| 17 | import subprocess |
| 18 | import logging |
| 19 | import itertools |
Benjamin Peterson | 58ea9fe | 2008-08-19 19:17:39 +0000 | [diff] [blame] | 20 | import queue |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 21 | |
| 22 | try: |
Benjamin Peterson | 58ea9fe | 2008-08-19 19:17:39 +0000 | [diff] [blame] | 23 | import pickle as pickle |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 24 | except ImportError: |
| 25 | import pickle |
| 26 | |
| 27 | from multiprocessing import Process, current_process, cpu_count |
| 28 | from multiprocessing import util, managers, connection, forking, pool |
| 29 | |
| 30 | # |
| 31 | # Logging |
| 32 | # |
| 33 | |
| 34 | def get_logger(): |
| 35 | return _logger |
| 36 | |
| 37 | _logger = logging.getLogger('distributing') |
| 38 | _logger.propogate = 0 |
| 39 | |
| 40 | util.fix_up_logger(_logger) |
| 41 | _formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT) |
| 42 | _handler = logging.StreamHandler() |
| 43 | _handler.setFormatter(_formatter) |
| 44 | _logger.addHandler(_handler) |
| 45 | |
| 46 | info = _logger.info |
| 47 | debug = _logger.debug |
| 48 | |
| 49 | # |
| 50 | # Get number of cpus |
| 51 | # |
| 52 | |
| 53 | try: |
| 54 | slot_count = cpu_count() |
| 55 | except NotImplemented: |
| 56 | slot_count = 1 |
| 57 | |
| 58 | # |
| 59 | # Manager type which spawns subprocesses |
| 60 | # |
| 61 | |
| 62 | class HostManager(managers.SyncManager): |
| 63 | ''' |
| 64 | Manager type used for spawning processes on a (presumably) foreign host |
| 65 | ''' |
| 66 | def __init__(self, address, authkey): |
| 67 | managers.SyncManager.__init__(self, address, authkey) |
| 68 | self._name = 'Host-unknown' |
| 69 | |
| 70 | def Process(self, group=None, target=None, name=None, args=(), kwargs={}): |
| 71 | if hasattr(sys.modules['__main__'], '__file__'): |
| 72 | main_path = os.path.basename(sys.modules['__main__'].__file__) |
| 73 | else: |
| 74 | main_path = None |
| 75 | data = pickle.dumps((target, args, kwargs)) |
| 76 | p = self._RemoteProcess(data, main_path) |
| 77 | if name is None: |
| 78 | temp = self._name.split('Host-')[-1] + '/Process-%s' |
| 79 | name = temp % ':'.join(map(str, p.get_identity())) |
| 80 | p.set_name(name) |
| 81 | return p |
| 82 | |
| 83 | @classmethod |
| 84 | def from_address(cls, address, authkey): |
| 85 | manager = cls(address, authkey) |
| 86 | managers.transact(address, authkey, 'dummy') |
| 87 | manager._state.value = managers.State.STARTED |
| 88 | manager._name = 'Host-%s:%s' % manager.address |
| 89 | manager.shutdown = util.Finalize( |
| 90 | manager, HostManager._finalize_host, |
| 91 | args=(manager._address, manager._authkey, manager._name), |
| 92 | exitpriority=-10 |
| 93 | ) |
| 94 | return manager |
| 95 | |
| 96 | @staticmethod |
| 97 | def _finalize_host(address, authkey, name): |
| 98 | managers.transact(address, authkey, 'shutdown') |
| 99 | |
| 100 | def __repr__(self): |
| 101 | return '<Host(%s)>' % self._name |
| 102 | |
| 103 | # |
| 104 | # Process subclass representing a process on (possibly) a remote machine |
| 105 | # |
| 106 | |
| 107 | class RemoteProcess(Process): |
| 108 | ''' |
| 109 | Represents a process started on a remote host |
| 110 | ''' |
| 111 | def __init__(self, data, main_path): |
| 112 | assert not main_path or os.path.basename(main_path) == main_path |
| 113 | Process.__init__(self) |
| 114 | self._data = data |
| 115 | self._main_path = main_path |
| 116 | |
| 117 | def _bootstrap(self): |
| 118 | forking.prepare({'main_path': self._main_path}) |
| 119 | self._target, self._args, self._kwargs = pickle.loads(self._data) |
| 120 | return Process._bootstrap(self) |
| 121 | |
| 122 | def get_identity(self): |
| 123 | return self._identity |
| 124 | |
| 125 | HostManager.register('_RemoteProcess', RemoteProcess) |
| 126 | |
| 127 | # |
| 128 | # A Pool class that uses a cluster |
| 129 | # |
| 130 | |
| 131 | class DistributedPool(pool.Pool): |
| 132 | |
| 133 | def __init__(self, cluster, processes=None, initializer=None, initargs=()): |
| 134 | self._cluster = cluster |
| 135 | self.Process = cluster.Process |
| 136 | pool.Pool.__init__(self, processes or len(cluster), |
| 137 | initializer, initargs) |
| 138 | |
| 139 | def _setup_queues(self): |
| 140 | self._inqueue = self._cluster._SettableQueue() |
| 141 | self._outqueue = self._cluster._SettableQueue() |
| 142 | self._quick_put = self._inqueue.put |
| 143 | self._quick_get = self._outqueue.get |
| 144 | |
| 145 | @staticmethod |
| 146 | def _help_stuff_finish(inqueue, task_handler, size): |
| 147 | inqueue.set_contents([None] * size) |
| 148 | |
| 149 | # |
| 150 | # Manager type which starts host managers on other machines |
| 151 | # |
| 152 | |
| 153 | def LocalProcess(**kwds): |
| 154 | p = Process(**kwds) |
Benjamin Peterson | 58ea9fe | 2008-08-19 19:17:39 +0000 | [diff] [blame] | 155 | p.set_name('localhost/' + p.name) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 156 | return p |
| 157 | |
| 158 | class Cluster(managers.SyncManager): |
| 159 | ''' |
| 160 | Represents collection of slots running on various hosts. |
| 161 | |
| 162 | `Cluster` is a subclass of `SyncManager` so it allows creation of |
| 163 | various types of shared objects. |
| 164 | ''' |
| 165 | def __init__(self, hostlist, modules): |
| 166 | managers.SyncManager.__init__(self, address=('localhost', 0)) |
| 167 | self._hostlist = hostlist |
| 168 | self._modules = modules |
| 169 | if __name__ not in modules: |
| 170 | modules.append(__name__) |
| 171 | files = [sys.modules[name].__file__ for name in modules] |
| 172 | for i, file in enumerate(files): |
| 173 | if file.endswith('.pyc') or file.endswith('.pyo'): |
| 174 | files[i] = file[:-4] + '.py' |
| 175 | self._files = [os.path.abspath(file) for file in files] |
| 176 | |
| 177 | def start(self): |
| 178 | managers.SyncManager.start(self) |
| 179 | |
| 180 | l = connection.Listener(family='AF_INET', authkey=self._authkey) |
| 181 | |
| 182 | for i, host in enumerate(self._hostlist): |
| 183 | host._start_manager(i, self._authkey, l.address, self._files) |
| 184 | |
| 185 | for host in self._hostlist: |
| 186 | if host.hostname != 'localhost': |
| 187 | conn = l.accept() |
| 188 | i, address, cpus = conn.recv() |
| 189 | conn.close() |
| 190 | other_host = self._hostlist[i] |
| 191 | other_host.manager = HostManager.from_address(address, |
| 192 | self._authkey) |
| 193 | other_host.slots = other_host.slots or cpus |
| 194 | other_host.Process = other_host.manager.Process |
| 195 | else: |
| 196 | host.slots = host.slots or slot_count |
| 197 | host.Process = LocalProcess |
| 198 | |
| 199 | self._slotlist = [ |
| 200 | Slot(host) for host in self._hostlist for i in range(host.slots) |
| 201 | ] |
| 202 | self._slot_iterator = itertools.cycle(self._slotlist) |
| 203 | self._base_shutdown = self.shutdown |
| 204 | del self.shutdown |
| 205 | |
| 206 | def shutdown(self): |
| 207 | for host in self._hostlist: |
| 208 | if host.hostname != 'localhost': |
| 209 | host.manager.shutdown() |
| 210 | self._base_shutdown() |
| 211 | |
| 212 | def Process(self, group=None, target=None, name=None, args=(), kwargs={}): |
Benjamin Peterson | 58ea9fe | 2008-08-19 19:17:39 +0000 | [diff] [blame] | 213 | slot = next(self._slot_iterator) |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 214 | return slot.Process( |
| 215 | group=group, target=target, name=name, args=args, kwargs=kwargs |
| 216 | ) |
| 217 | |
| 218 | def Pool(self, processes=None, initializer=None, initargs=()): |
| 219 | return DistributedPool(self, processes, initializer, initargs) |
| 220 | |
| 221 | def __getitem__(self, i): |
| 222 | return self._slotlist[i] |
| 223 | |
| 224 | def __len__(self): |
| 225 | return len(self._slotlist) |
| 226 | |
| 227 | def __iter__(self): |
| 228 | return iter(self._slotlist) |
| 229 | |
| 230 | # |
| 231 | # Queue subclass used by distributed pool |
| 232 | # |
| 233 | |
Benjamin Peterson | 58ea9fe | 2008-08-19 19:17:39 +0000 | [diff] [blame] | 234 | class SettableQueue(queue.Queue): |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 235 | def empty(self): |
| 236 | return not self.queue |
| 237 | def full(self): |
| 238 | return self.maxsize > 0 and len(self.queue) == self.maxsize |
| 239 | def set_contents(self, contents): |
| 240 | # length of contents must be at least as large as the number of |
| 241 | # threads which have potentially called get() |
| 242 | self.not_empty.acquire() |
| 243 | try: |
| 244 | self.queue.clear() |
| 245 | self.queue.extend(contents) |
Benjamin Peterson | 58ea9fe | 2008-08-19 19:17:39 +0000 | [diff] [blame] | 246 | self.not_empty.notifyAll() |
Benjamin Peterson | e711caf | 2008-06-11 16:44:04 +0000 | [diff] [blame] | 247 | finally: |
| 248 | self.not_empty.release() |
| 249 | |
| 250 | Cluster.register('_SettableQueue', SettableQueue) |
| 251 | |
| 252 | # |
| 253 | # Class representing a notional cpu in the cluster |
| 254 | # |
| 255 | |
| 256 | class Slot(object): |
| 257 | def __init__(self, host): |
| 258 | self.host = host |
| 259 | self.Process = host.Process |
| 260 | |
| 261 | # |
| 262 | # Host |
| 263 | # |
| 264 | |
| 265 | class Host(object): |
| 266 | ''' |
| 267 | Represents a host to use as a node in a cluster. |
| 268 | |
| 269 | `hostname` gives the name of the host. If hostname is not |
| 270 | "localhost" then ssh is used to log in to the host. To log in as |
| 271 | a different user use a host name of the form |
| 272 | "username@somewhere.org" |
| 273 | |
| 274 | `slots` is used to specify the number of slots for processes on |
| 275 | the host. This affects how often processes will be allocated to |
| 276 | this host. Normally this should be equal to the number of cpus on |
| 277 | that host. |
| 278 | ''' |
| 279 | def __init__(self, hostname, slots=None): |
| 280 | self.hostname = hostname |
| 281 | self.slots = slots |
| 282 | |
| 283 | def _start_manager(self, index, authkey, address, files): |
| 284 | if self.hostname != 'localhost': |
| 285 | tempdir = copy_to_remote_temporary_directory(self.hostname, files) |
| 286 | debug('startup files copied to %s:%s', self.hostname, tempdir) |
| 287 | p = subprocess.Popen( |
| 288 | ['ssh', self.hostname, 'python', '-c', |
| 289 | '"import os; os.chdir(%r); ' |
| 290 | 'from distributing import main; main()"' % tempdir], |
| 291 | stdin=subprocess.PIPE |
| 292 | ) |
| 293 | data = dict( |
| 294 | name='BoostrappingHost', index=index, |
| 295 | dist_log_level=_logger.getEffectiveLevel(), |
| 296 | dir=tempdir, authkey=str(authkey), parent_address=address |
| 297 | ) |
| 298 | pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL) |
| 299 | p.stdin.close() |
| 300 | |
| 301 | # |
| 302 | # Copy files to remote directory, returning name of directory |
| 303 | # |
| 304 | |
| 305 | unzip_code = '''" |
| 306 | import tempfile, os, sys, tarfile |
| 307 | tempdir = tempfile.mkdtemp(prefix='distrib-') |
| 308 | os.chdir(tempdir) |
| 309 | tf = tarfile.open(fileobj=sys.stdin, mode='r|gz') |
| 310 | for ti in tf: |
| 311 | tf.extract(ti) |
| 312 | print tempdir |
| 313 | "''' |
| 314 | |
| 315 | def copy_to_remote_temporary_directory(host, files): |
| 316 | p = subprocess.Popen( |
| 317 | ['ssh', host, 'python', '-c', unzip_code], |
| 318 | stdout=subprocess.PIPE, stdin=subprocess.PIPE |
| 319 | ) |
| 320 | tf = tarfile.open(fileobj=p.stdin, mode='w|gz') |
| 321 | for name in files: |
| 322 | tf.add(name, os.path.basename(name)) |
| 323 | tf.close() |
| 324 | p.stdin.close() |
| 325 | return p.stdout.read().rstrip() |
| 326 | |
| 327 | # |
| 328 | # Code which runs a host manager |
| 329 | # |
| 330 | |
| 331 | def main(): |
| 332 | # get data from parent over stdin |
| 333 | data = pickle.load(sys.stdin) |
| 334 | sys.stdin.close() |
| 335 | |
| 336 | # set some stuff |
| 337 | _logger.setLevel(data['dist_log_level']) |
| 338 | forking.prepare(data) |
| 339 | |
| 340 | # create server for a `HostManager` object |
| 341 | server = managers.Server(HostManager._registry, ('', 0), data['authkey']) |
| 342 | current_process()._server = server |
| 343 | |
| 344 | # report server address and number of cpus back to parent |
| 345 | conn = connection.Client(data['parent_address'], authkey=data['authkey']) |
| 346 | conn.send((data['index'], server.address, slot_count)) |
| 347 | conn.close() |
| 348 | |
| 349 | # set name etc |
| 350 | current_process().set_name('Host-%s:%s' % server.address) |
| 351 | util._run_after_forkers() |
| 352 | |
| 353 | # register a cleanup function |
| 354 | def cleanup(directory): |
| 355 | debug('removing directory %s', directory) |
| 356 | shutil.rmtree(directory) |
| 357 | debug('shutting down host manager') |
| 358 | util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0) |
| 359 | |
| 360 | # start host manager |
| 361 | debug('remote host manager starting in %s', data['dir']) |
| 362 | server.serve_forever() |