| # | |
| # Module to allow spawning of processes on foreign host | |
| # | |
| # Depends on `multiprocessing` package -- tested with `processing-0.60` | |
| # | |
| __all__ = ['Cluster', 'Host', 'get_logger', 'current_process'] | |
| # | |
| # Imports | |
| # | |
| import sys | |
| import os | |
| import tarfile | |
| import shutil | |
| import subprocess | |
| import logging | |
| import itertools | |
| import Queue | |
| try: | |
| import cPickle as pickle | |
| except ImportError: | |
| import pickle | |
| from multiprocessing import Process, current_process, cpu_count | |
| from multiprocessing import util, managers, connection, forking, pool | |
| # | |
| # Logging | |
| # | |
| def get_logger(): | |
| return _logger | |
| _logger = logging.getLogger('distributing') | |
| _logger.propogate = 0 | |
| util.fix_up_logger(_logger) | |
| _formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT) | |
| _handler = logging.StreamHandler() | |
| _handler.setFormatter(_formatter) | |
| _logger.addHandler(_handler) | |
| info = _logger.info | |
| debug = _logger.debug | |
| # | |
| # Get number of cpus | |
| # | |
| try: | |
| slot_count = cpu_count() | |
| except NotImplemented: | |
| slot_count = 1 | |
| # | |
| # Manager type which spawns subprocesses | |
| # | |
| class HostManager(managers.SyncManager): | |
| ''' | |
| Manager type used for spawning processes on a (presumably) foreign host | |
| ''' | |
| def __init__(self, address, authkey): | |
| managers.SyncManager.__init__(self, address, authkey) | |
| self._name = 'Host-unknown' | |
| def Process(self, group=None, target=None, name=None, args=(), kwargs={}): | |
| if hasattr(sys.modules['__main__'], '__file__'): | |
| main_path = os.path.basename(sys.modules['__main__'].__file__) | |
| else: | |
| main_path = None | |
| data = pickle.dumps((target, args, kwargs)) | |
| p = self._RemoteProcess(data, main_path) | |
| if name is None: | |
| temp = self._name.split('Host-')[-1] + '/Process-%s' | |
| name = temp % ':'.join(map(str, p.get_identity())) | |
| p.set_name(name) | |
| return p | |
| @classmethod | |
| def from_address(cls, address, authkey): | |
| manager = cls(address, authkey) | |
| managers.transact(address, authkey, 'dummy') | |
| manager._state.value = managers.State.STARTED | |
| manager._name = 'Host-%s:%s' % manager.address | |
| manager.shutdown = util.Finalize( | |
| manager, HostManager._finalize_host, | |
| args=(manager._address, manager._authkey, manager._name), | |
| exitpriority=-10 | |
| ) | |
| return manager | |
| @staticmethod | |
| def _finalize_host(address, authkey, name): | |
| managers.transact(address, authkey, 'shutdown') | |
| def __repr__(self): | |
| return '<Host(%s)>' % self._name | |
| # | |
| # Process subclass representing a process on (possibly) a remote machine | |
| # | |
| class RemoteProcess(Process): | |
| ''' | |
| Represents a process started on a remote host | |
| ''' | |
| def __init__(self, data, main_path): | |
| assert not main_path or os.path.basename(main_path) == main_path | |
| Process.__init__(self) | |
| self._data = data | |
| self._main_path = main_path | |
| def _bootstrap(self): | |
| forking.prepare({'main_path': self._main_path}) | |
| self._target, self._args, self._kwargs = pickle.loads(self._data) | |
| return Process._bootstrap(self) | |
| def get_identity(self): | |
| return self._identity | |
| HostManager.register('_RemoteProcess', RemoteProcess) | |
| # | |
| # A Pool class that uses a cluster | |
| # | |
| class DistributedPool(pool.Pool): | |
| def __init__(self, cluster, processes=None, initializer=None, initargs=()): | |
| self._cluster = cluster | |
| self.Process = cluster.Process | |
| pool.Pool.__init__(self, processes or len(cluster), | |
| initializer, initargs) | |
| def _setup_queues(self): | |
| self._inqueue = self._cluster._SettableQueue() | |
| self._outqueue = self._cluster._SettableQueue() | |
| self._quick_put = self._inqueue.put | |
| self._quick_get = self._outqueue.get | |
| @staticmethod | |
| def _help_stuff_finish(inqueue, task_handler, size): | |
| inqueue.set_contents([None] * size) | |
| # | |
| # Manager type which starts host managers on other machines | |
| # | |
| def LocalProcess(**kwds): | |
| p = Process(**kwds) | |
| p.set_name('localhost/' + p.name) | |
| return p | |
| class Cluster(managers.SyncManager): | |
| ''' | |
| Represents collection of slots running on various hosts. | |
| `Cluster` is a subclass of `SyncManager` so it allows creation of | |
| various types of shared objects. | |
| ''' | |
| def __init__(self, hostlist, modules): | |
| managers.SyncManager.__init__(self, address=('localhost', 0)) | |
| self._hostlist = hostlist | |
| self._modules = modules | |
| if __name__ not in modules: | |
| modules.append(__name__) | |
| files = [sys.modules[name].__file__ for name in modules] | |
| for i, file in enumerate(files): | |
| if file.endswith('.pyc') or file.endswith('.pyo'): | |
| files[i] = file[:-4] + '.py' | |
| self._files = [os.path.abspath(file) for file in files] | |
| def start(self): | |
| managers.SyncManager.start(self) | |
| l = connection.Listener(family='AF_INET', authkey=self._authkey) | |
| for i, host in enumerate(self._hostlist): | |
| host._start_manager(i, self._authkey, l.address, self._files) | |
| for host in self._hostlist: | |
| if host.hostname != 'localhost': | |
| conn = l.accept() | |
| i, address, cpus = conn.recv() | |
| conn.close() | |
| other_host = self._hostlist[i] | |
| other_host.manager = HostManager.from_address(address, | |
| self._authkey) | |
| other_host.slots = other_host.slots or cpus | |
| other_host.Process = other_host.manager.Process | |
| else: | |
| host.slots = host.slots or slot_count | |
| host.Process = LocalProcess | |
| self._slotlist = [ | |
| Slot(host) for host in self._hostlist for i in range(host.slots) | |
| ] | |
| self._slot_iterator = itertools.cycle(self._slotlist) | |
| self._base_shutdown = self.shutdown | |
| del self.shutdown | |
| def shutdown(self): | |
| for host in self._hostlist: | |
| if host.hostname != 'localhost': | |
| host.manager.shutdown() | |
| self._base_shutdown() | |
| def Process(self, group=None, target=None, name=None, args=(), kwargs={}): | |
| slot = self._slot_iterator.next() | |
| return slot.Process( | |
| group=group, target=target, name=name, args=args, kwargs=kwargs | |
| ) | |
| def Pool(self, processes=None, initializer=None, initargs=()): | |
| return DistributedPool(self, processes, initializer, initargs) | |
| def __getitem__(self, i): | |
| return self._slotlist[i] | |
| def __len__(self): | |
| return len(self._slotlist) | |
| def __iter__(self): | |
| return iter(self._slotlist) | |
| # | |
| # Queue subclass used by distributed pool | |
| # | |
| class SettableQueue(Queue.Queue): | |
| def empty(self): | |
| return not self.queue | |
| def full(self): | |
| return self.maxsize > 0 and len(self.queue) == self.maxsize | |
| def set_contents(self, contents): | |
| # length of contents must be at least as large as the number of | |
| # threads which have potentially called get() | |
| self.not_empty.acquire() | |
| try: | |
| self.queue.clear() | |
| self.queue.extend(contents) | |
| self.not_empty.notifyAll() | |
| finally: | |
| self.not_empty.release() | |
| Cluster.register('_SettableQueue', SettableQueue) | |
| # | |
| # Class representing a notional cpu in the cluster | |
| # | |
| class Slot(object): | |
| def __init__(self, host): | |
| self.host = host | |
| self.Process = host.Process | |
| # | |
| # Host | |
| # | |
| class Host(object): | |
| ''' | |
| Represents a host to use as a node in a cluster. | |
| `hostname` gives the name of the host. If hostname is not | |
| "localhost" then ssh is used to log in to the host. To log in as | |
| a different user use a host name of the form | |
| "username@somewhere.org" | |
| `slots` is used to specify the number of slots for processes on | |
| the host. This affects how often processes will be allocated to | |
| this host. Normally this should be equal to the number of cpus on | |
| that host. | |
| ''' | |
| def __init__(self, hostname, slots=None): | |
| self.hostname = hostname | |
| self.slots = slots | |
| def _start_manager(self, index, authkey, address, files): | |
| if self.hostname != 'localhost': | |
| tempdir = copy_to_remote_temporary_directory(self.hostname, files) | |
| debug('startup files copied to %s:%s', self.hostname, tempdir) | |
| p = subprocess.Popen( | |
| ['ssh', self.hostname, 'python', '-c', | |
| '"import os; os.chdir(%r); ' | |
| 'from distributing import main; main()"' % tempdir], | |
| stdin=subprocess.PIPE | |
| ) | |
| data = dict( | |
| name='BoostrappingHost', index=index, | |
| dist_log_level=_logger.getEffectiveLevel(), | |
| dir=tempdir, authkey=str(authkey), parent_address=address | |
| ) | |
| pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL) | |
| p.stdin.close() | |
| # | |
| # Copy files to remote directory, returning name of directory | |
| # | |
| unzip_code = '''" | |
| import tempfile, os, sys, tarfile | |
| tempdir = tempfile.mkdtemp(prefix='distrib-') | |
| os.chdir(tempdir) | |
| tf = tarfile.open(fileobj=sys.stdin, mode='r|gz') | |
| for ti in tf: | |
| tf.extract(ti) | |
| print tempdir | |
| "''' | |
| def copy_to_remote_temporary_directory(host, files): | |
| p = subprocess.Popen( | |
| ['ssh', host, 'python', '-c', unzip_code], | |
| stdout=subprocess.PIPE, stdin=subprocess.PIPE | |
| ) | |
| tf = tarfile.open(fileobj=p.stdin, mode='w|gz') | |
| for name in files: | |
| tf.add(name, os.path.basename(name)) | |
| tf.close() | |
| p.stdin.close() | |
| return p.stdout.read().rstrip() | |
| # | |
| # Code which runs a host manager | |
| # | |
| def main(): | |
| # get data from parent over stdin | |
| data = pickle.load(sys.stdin) | |
| sys.stdin.close() | |
| # set some stuff | |
| _logger.setLevel(data['dist_log_level']) | |
| forking.prepare(data) | |
| # create server for a `HostManager` object | |
| server = managers.Server(HostManager._registry, ('', 0), data['authkey']) | |
| current_process()._server = server | |
| # report server address and number of cpus back to parent | |
| conn = connection.Client(data['parent_address'], authkey=data['authkey']) | |
| conn.send((data['index'], server.address, slot_count)) | |
| conn.close() | |
| # set name etc | |
| current_process().set_name('Host-%s:%s' % server.address) | |
| util._run_after_forkers() | |
| # register a cleanup function | |
| def cleanup(directory): | |
| debug('removing directory %s', directory) | |
| shutil.rmtree(directory) | |
| debug('shutting down host manager') | |
| util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0) | |
| # start host manager | |
| debug('remote host manager starting in %s', data['dir']) | |
| server.serve_forever() |