| # |
| # Module to allow spawning of processes on foreign host |
| # |
| # Depends on `multiprocessing` package -- tested with `processing-0.60` |
| # |
| |
| __all__ = ['Cluster', 'Host', 'get_logger', 'current_process'] |
| |
| # |
| # Imports |
| # |
| |
| import sys |
| import os |
| import tarfile |
| import shutil |
| import subprocess |
| import logging |
| import itertools |
| import queue |
| |
| try: |
| import pickle as pickle |
| except ImportError: |
| import pickle |
| |
| from multiprocessing import Process, current_process, cpu_count |
| from multiprocessing import util, managers, connection, forking, pool |
| |
| # |
| # Logging |
| # |
| |
| def get_logger(): |
| return _logger |
| |
| _logger = logging.getLogger('distributing') |
| _logger.propogate = 0 |
| |
| util.fix_up_logger(_logger) |
| _formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT) |
| _handler = logging.StreamHandler() |
| _handler.setFormatter(_formatter) |
| _logger.addHandler(_handler) |
| |
| info = _logger.info |
| debug = _logger.debug |
| |
| # |
| # Get number of cpus |
| # |
| |
| try: |
| slot_count = cpu_count() |
| except NotImplemented: |
| slot_count = 1 |
| |
| # |
| # Manager type which spawns subprocesses |
| # |
| |
| class HostManager(managers.SyncManager): |
| ''' |
| Manager type used for spawning processes on a (presumably) foreign host |
| ''' |
| def __init__(self, address, authkey): |
| managers.SyncManager.__init__(self, address, authkey) |
| self._name = 'Host-unknown' |
| |
| def Process(self, group=None, target=None, name=None, args=(), kwargs={}): |
| if hasattr(sys.modules['__main__'], '__file__'): |
| main_path = os.path.basename(sys.modules['__main__'].__file__) |
| else: |
| main_path = None |
| data = pickle.dumps((target, args, kwargs)) |
| p = self._RemoteProcess(data, main_path) |
| if name is None: |
| temp = self._name.split('Host-')[-1] + '/Process-%s' |
| name = temp % ':'.join(map(str, p.get_identity())) |
| p.set_name(name) |
| return p |
| |
| @classmethod |
| def from_address(cls, address, authkey): |
| manager = cls(address, authkey) |
| managers.transact(address, authkey, 'dummy') |
| manager._state.value = managers.State.STARTED |
| manager._name = 'Host-%s:%s' % manager.address |
| manager.shutdown = util.Finalize( |
| manager, HostManager._finalize_host, |
| args=(manager._address, manager._authkey, manager._name), |
| exitpriority=-10 |
| ) |
| return manager |
| |
| @staticmethod |
| def _finalize_host(address, authkey, name): |
| managers.transact(address, authkey, 'shutdown') |
| |
| def __repr__(self): |
| return '<Host(%s)>' % self._name |
| |
| # |
| # Process subclass representing a process on (possibly) a remote machine |
| # |
| |
| class RemoteProcess(Process): |
| ''' |
| Represents a process started on a remote host |
| ''' |
| def __init__(self, data, main_path): |
| assert not main_path or os.path.basename(main_path) == main_path |
| Process.__init__(self) |
| self._data = data |
| self._main_path = main_path |
| |
| def _bootstrap(self): |
| forking.prepare({'main_path': self._main_path}) |
| self._target, self._args, self._kwargs = pickle.loads(self._data) |
| return Process._bootstrap(self) |
| |
| def get_identity(self): |
| return self._identity |
| |
| HostManager.register('_RemoteProcess', RemoteProcess) |
| |
| # |
| # A Pool class that uses a cluster |
| # |
| |
| class DistributedPool(pool.Pool): |
| |
| def __init__(self, cluster, processes=None, initializer=None, initargs=()): |
| self._cluster = cluster |
| self.Process = cluster.Process |
| pool.Pool.__init__(self, processes or len(cluster), |
| initializer, initargs) |
| |
| def _setup_queues(self): |
| self._inqueue = self._cluster._SettableQueue() |
| self._outqueue = self._cluster._SettableQueue() |
| self._quick_put = self._inqueue.put |
| self._quick_get = self._outqueue.get |
| |
| @staticmethod |
| def _help_stuff_finish(inqueue, task_handler, size): |
| inqueue.set_contents([None] * size) |
| |
| # |
| # Manager type which starts host managers on other machines |
| # |
| |
| def LocalProcess(**kwds): |
| p = Process(**kwds) |
| p.set_name('localhost/' + p.name) |
| return p |
| |
| class Cluster(managers.SyncManager): |
| ''' |
| Represents collection of slots running on various hosts. |
| |
| `Cluster` is a subclass of `SyncManager` so it allows creation of |
| various types of shared objects. |
| ''' |
| def __init__(self, hostlist, modules): |
| managers.SyncManager.__init__(self, address=('localhost', 0)) |
| self._hostlist = hostlist |
| self._modules = modules |
| if __name__ not in modules: |
| modules.append(__name__) |
| files = [sys.modules[name].__file__ for name in modules] |
| for i, file in enumerate(files): |
| if file.endswith('.pyc') or file.endswith('.pyo'): |
| files[i] = file[:-4] + '.py' |
| self._files = [os.path.abspath(file) for file in files] |
| |
| def start(self): |
| managers.SyncManager.start(self) |
| |
| l = connection.Listener(family='AF_INET', authkey=self._authkey) |
| |
| for i, host in enumerate(self._hostlist): |
| host._start_manager(i, self._authkey, l.address, self._files) |
| |
| for host in self._hostlist: |
| if host.hostname != 'localhost': |
| conn = l.accept() |
| i, address, cpus = conn.recv() |
| conn.close() |
| other_host = self._hostlist[i] |
| other_host.manager = HostManager.from_address(address, |
| self._authkey) |
| other_host.slots = other_host.slots or cpus |
| other_host.Process = other_host.manager.Process |
| else: |
| host.slots = host.slots or slot_count |
| host.Process = LocalProcess |
| |
| self._slotlist = [ |
| Slot(host) for host in self._hostlist for i in range(host.slots) |
| ] |
| self._slot_iterator = itertools.cycle(self._slotlist) |
| self._base_shutdown = self.shutdown |
| del self.shutdown |
| |
| def shutdown(self): |
| for host in self._hostlist: |
| if host.hostname != 'localhost': |
| host.manager.shutdown() |
| self._base_shutdown() |
| |
| def Process(self, group=None, target=None, name=None, args=(), kwargs={}): |
| slot = next(self._slot_iterator) |
| return slot.Process( |
| group=group, target=target, name=name, args=args, kwargs=kwargs |
| ) |
| |
| def Pool(self, processes=None, initializer=None, initargs=()): |
| return DistributedPool(self, processes, initializer, initargs) |
| |
| def __getitem__(self, i): |
| return self._slotlist[i] |
| |
| def __len__(self): |
| return len(self._slotlist) |
| |
| def __iter__(self): |
| return iter(self._slotlist) |
| |
| # |
| # Queue subclass used by distributed pool |
| # |
| |
| class SettableQueue(queue.Queue): |
| def empty(self): |
| return not self.queue |
| def full(self): |
| return self.maxsize > 0 and len(self.queue) == self.maxsize |
| def set_contents(self, contents): |
| # length of contents must be at least as large as the number of |
| # threads which have potentially called get() |
| self.not_empty.acquire() |
| try: |
| self.queue.clear() |
| self.queue.extend(contents) |
| self.not_empty.notifyAll() |
| finally: |
| self.not_empty.release() |
| |
| Cluster.register('_SettableQueue', SettableQueue) |
| |
| # |
| # Class representing a notional cpu in the cluster |
| # |
| |
| class Slot(object): |
| def __init__(self, host): |
| self.host = host |
| self.Process = host.Process |
| |
| # |
| # Host |
| # |
| |
| class Host(object): |
| ''' |
| Represents a host to use as a node in a cluster. |
| |
| `hostname` gives the name of the host. If hostname is not |
| "localhost" then ssh is used to log in to the host. To log in as |
| a different user use a host name of the form |
| "username@somewhere.org" |
| |
| `slots` is used to specify the number of slots for processes on |
| the host. This affects how often processes will be allocated to |
| this host. Normally this should be equal to the number of cpus on |
| that host. |
| ''' |
| def __init__(self, hostname, slots=None): |
| self.hostname = hostname |
| self.slots = slots |
| |
| def _start_manager(self, index, authkey, address, files): |
| if self.hostname != 'localhost': |
| tempdir = copy_to_remote_temporary_directory(self.hostname, files) |
| debug('startup files copied to %s:%s', self.hostname, tempdir) |
| p = subprocess.Popen( |
| ['ssh', self.hostname, 'python', '-c', |
| '"import os; os.chdir(%r); ' |
| 'from distributing import main; main()"' % tempdir], |
| stdin=subprocess.PIPE |
| ) |
| data = dict( |
| name='BoostrappingHost', index=index, |
| dist_log_level=_logger.getEffectiveLevel(), |
| dir=tempdir, authkey=str(authkey), parent_address=address |
| ) |
| pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL) |
| p.stdin.close() |
| |
| # |
| # Copy files to remote directory, returning name of directory |
| # |
| |
| unzip_code = '''" |
| import tempfile, os, sys, tarfile |
| tempdir = tempfile.mkdtemp(prefix='distrib-') |
| os.chdir(tempdir) |
| tf = tarfile.open(fileobj=sys.stdin, mode='r|gz') |
| for ti in tf: |
| tf.extract(ti) |
| print tempdir |
| "''' |
| |
| def copy_to_remote_temporary_directory(host, files): |
| p = subprocess.Popen( |
| ['ssh', host, 'python', '-c', unzip_code], |
| stdout=subprocess.PIPE, stdin=subprocess.PIPE |
| ) |
| tf = tarfile.open(fileobj=p.stdin, mode='w|gz') |
| for name in files: |
| tf.add(name, os.path.basename(name)) |
| tf.close() |
| p.stdin.close() |
| return p.stdout.read().rstrip() |
| |
| # |
| # Code which runs a host manager |
| # |
| |
| def main(): |
| # get data from parent over stdin |
| data = pickle.load(sys.stdin) |
| sys.stdin.close() |
| |
| # set some stuff |
| _logger.setLevel(data['dist_log_level']) |
| forking.prepare(data) |
| |
| # create server for a `HostManager` object |
| server = managers.Server(HostManager._registry, ('', 0), data['authkey']) |
| current_process()._server = server |
| |
| # report server address and number of cpus back to parent |
| conn = connection.Client(data['parent_address'], authkey=data['authkey']) |
| conn.send((data['index'], server.address, slot_count)) |
| conn.close() |
| |
| # set name etc |
| current_process().set_name('Host-%s:%s' % server.address) |
| util._run_after_forkers() |
| |
| # register a cleanup function |
| def cleanup(directory): |
| debug('removing directory %s', directory) |
| shutil.rmtree(directory) |
| debug('shutting down host manager') |
| util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0) |
| |
| # start host manager |
| debug('remote host manager starting in %s', data['dir']) |
| server.serve_forever() |