%pythoncode %{ import socket import time import sys import os import re import random import socket import xdrlib try: import cPickle as pickle except ImportError: import pickle from IMP.parallel import slavestate from IMP.parallel.subproc import _run_background, _Popen4 from IMP.parallel.util import _ListenSocket, _ErrorWrapper from IMP.parallel.util import _TaskWrapper, _HeartBeat, _ContextWrapper from IMP.parallel.util import _SetPathAction # Save sys.path at import time, so that slaves can import using the same # path that works for the master imports _import_time_path = sys.path[:] class Error(Exception): """Base class for all errors specific to the parallel module""" pass class _NoMoreTasksError(Error): pass class NoMoreSlavesError(Error): """Error raised if all slaves failed, so tasks cannot be run""" pass class NetworkError(Error): """Error raised if a problem occurs with the network""" pass class RemoteError(Error): """Error raised if a slave has an unhandled exception""" def __init__(self, exc, traceback, slave): self.exc = exc self.traceback = traceback self.slave = slave def __str__(self): errstr = str(self.exc.__class__).replace("exceptions.", "") return "%s: %s from %s\nRemote traceback:\n%s" \ % (errstr, str(self.exc), str(self.slave), self.traceback) class _Communicator(object): """Simple support for sending Python pickled objects over the network""" def __init__(self): self._socket = None self._ibuffer = '' def _send(self, obj): p = xdrlib.Packer() try: p.pack_string(pickle.dumps(obj, -1)) # Python < 2.5 can fail trying to send Inf or NaN floats in binary # mode, so fall back to the old protocol in this case: except SystemError: p.pack_string(pickle.dumps(obj, 0)) self._socket.sendall(p.get_buffer()) def get_data_pending(self): return len(self._ibuffer) > 0 def _recv(self): while True: try: obj, self._ibuffer = self._unpickle(self._ibuffer) if isinstance(obj, _ErrorWrapper): raise RemoteError(obj.obj, obj.traceback, self) else: return obj except (IndexError, EOFError): try: data = self._socket.recv(4096) except socket.error, detail: raise NetworkError("Connection lost to %s: %s" \ % (str(self), str(detail))) if len(data) > 0: self._ibuffer += data else: raise NetworkError("%s closed connection" % str(self)) def _unpickle(self, ibuffer): p = xdrlib.Unpacker(ibuffer) obj = p.unpack_string() return (pickle.loads(obj), ibuffer[p.get_position():]) class Slave(_Communicator): """Representation of a single slave. Each slave uses a single thread of execution (i.e. a single CPU core) to run tasks sequentially. Slave is an abstract class; instead of using this class directly, use a subclass such as LocalSlave or SGEQsubSlaveArray.""" def __init__(self): _Communicator.__init__(self) self._state = slavestate.init self._context = None self._task = None self.update_contact_time() def _start(self, command, unique_id, output): """Start the slave running on the remote host; override in subclasses""" self._state = slavestate.started def _accept_connection(self, sock): self._socket = sock self._state = slavestate.connected self.update_contact_time() def _set_python_search_path(self, path): self._send(_SetPathAction(path)) def update_contact_time(self): self.last_contact_time = time.time() def get_contact_timed_out(self, timeout): return (time.time() - self.last_contact_time) > timeout def _start_task(self, task, context): if not self.ready_for_task(context) and not self.ready_for_task(None): raise TypeError("%s not ready for task" % str(self)) if self._context != context: self._context = context self._send(_ContextWrapper(context._startup)) self._state = slavestate.running_task self._task = task self._send(_TaskWrapper(task)) def _get_finished_task(self): while True: r = self._recv() self.update_contact_time() if isinstance(r, _HeartBeat): if not self.get_data_pending(): return None else: break task = self._task task._results = r self._task = None self._state = slavestate.connected return task def _kill(self): task = self._task self._task = None self._context = None self._state = slavestate.dead return task def ready_to_start(self): return self._state == slavestate.init def ready_for_task(self, context): return self._state == slavestate.connected \ and self._context == context def running_task(self, context): return self._state == slavestate.running_task \ and self._context == context class SlaveArray(object): """Representation of an array of slaves. This is similar to Slave, except that it represents a collection of slaves that are controlled together, such as a batch submission system array job on a compute cluster.""" def _get_slaves(self): """Return a list of Slave objects contained within this array""" pass def _start(self): """Do any necessary startup after all contained Slaves have started""" pass class LocalSlave(Slave): """A slave running on the same machine as the master.""" def _start(self, command, unique_id, output): Slave._start(self, command, unique_id, output) cmdline = "%s %s" % (command, unique_id) _run_background(cmdline, output) def __repr__(self): return "" class _SGEQsubSlave(Slave): def __init__(self, array): Slave.__init__(self) self._jobid = None self._array = array def _start(self, command, unique_id, output): Slave._start(self, command, unique_id, output) self._array._slave_started(unique_id, output, self) def __repr__(self): jobid = self._jobid if jobid is None: jobid = '(unknown)' return "" % jobid class SGEQsubSlaveArray(SlaveArray): """An array of slaves on a Sun Grid Engine system, started with 'qsub'. To use this class, the master process must be running on a machine that can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this is termed a 'submit host' by SGE). The class starts an SGE job array (every slave has the same SGE job ID, but a different task ID). @param numslave The number of slaves, which correponds to the number of tasks in the SGE job. @param options A string of SGE options that are passed on the 'qsub' command line. This is added to standard_options. """ standard_options = '-j y -cwd -r n -o sge-errors' def __init__(self, numslave, options): self._numslave = numslave self._options = options self._starting_slaves = [] self._jobid = None def _get_slaves(self): """Return a list of Slave objects contained within this array""" return [_SGEQsubSlave(self) for x in range(self._numslave)] def _slave_started(self, command, output, slave): self._starting_slaves.append((command, output, slave)) def _start(self, command): qsub = "qsub -S /bin/sh %s %s -t 1-%d" % \ (self._options, self.standard_options, len(self._starting_slaves)) print qsub a = _Popen4(qsub) (inp, out) = (a.stdin, a.stdout) slave_uid = " ".join([repr(s[0]) for s in self._starting_slaves]) slave_out = " ".join([repr(s[1]) for s in self._starting_slaves]) inp.write("#!/bin/sh\n") inp.write("uid=( '' %s )\n" % slave_uid) inp.write("out=( '' %s )\n" % slave_out) inp.write("myuid=${uid[$SGE_TASK_ID]}\n") inp.write("myout=${out[$SGE_TASK_ID]}\n") inp.write("%s $myuid > $myout 2>&1\n" % command) inp.close() outlines = out.readlines() out.close() for line in outlines: print line.rstrip('\r\n') a.require_clean_exit() self._set_jobid(outlines) self._starting_slaves = [] def _set_jobid(self, outlines): """Try to figure out the job ID from the SGE qsub output""" if len(outlines) > 0: m = re.compile(r"\d+").search(outlines[0]) if m: self._jobid = int(m.group()) for (num, slave) in enumerate(self._starting_slaves): slave[2]._jobid = "%d.%d" % (self._jobid, num+1) class _SGEPESlave(Slave): def __init__(self, host): Slave.__init__(self) self._host = host def _start(self, command, unique_id, output): Slave._start(self, command, unique_id, output) cmdline = "qrsh -inherit -V %s %s %s" % (self._host, command, unique_id) _run_background(cmdline, output) def __repr__(self): return "" % self._host class SGEPESlaveArray(SlaveArray): """An array of slaves in a Sun Grid Engine system parallel environment. In order to use this class, the master must be run via Sun Grid Engine's 'qsub' command and submitted to a parallel environment using the qsub -pe option. This class will start slaves on every node in the parallel environment (including the node running the master). Each slave is started using the 'qrsh' command with the '-inherit' option.""" def _get_slaves(self): slaves = [] pe = os.environ['PE_HOSTFILE'] fh = open(pe, "r") while True: line = fh.readline() if line == '': break (node, num, queue) = line.split(None, 2) for i in range(int(num)): slaves.append(_SGEPESlave(node)) # Replace first slave with a local slave, as this is ourself, and SGE # won't let us start this process with qrsh (as we are already # occupying the slot) if len(slaves) > 0: slaves[0] = LocalSlave() return slaves class Context(object): """A collection of tasks that run in the same environment. Context objects are typically created by calling Manager::get_context(). """ def __init__(self, manager, startup=None): self._manager = manager self._startup = startup self._tasks = [] def add_task(self, task): """Add a task to this context. Tasks are any Python callable object that can be pickled (e.g. a function or a class that implements the __call__ method). When the task is run on the slave its arguments are the return value from this context's startup function.""" self._tasks.append(task) def get_results_unordered(self): """Run all of the tasks on available slaves, and return results. If there are more tasks than slaves, subsequent tasks are started only once a running task completes: each slave only runs a single task at a time. As each task completes, the return value(s) from the task callable are returned from this method, as a Python generator. Note that the results are returned in the order that tasks complete, which may not be the same as the order they were submitted in. \exception NoMoreSlavesError there are no slaves available to run the tasks (or they all failed during execution). \exception RemoteError a slave encountered an unhandled exception. \exception NetworkError the master lost (or was unable to establish) communication with any slave. """ return self._manager._get_results_unordered(self) class Manager(object): """Manages slaves and contexts. @param python If not None, the command to run to start a Python interpreter that can import the IMP module. Otherwise, the same interpreter that the master is currently using is used. This is passed to the shell, so a full command line (including multiple words separated by spaces) can be used if necessary. @param host The hostname that slaves use to connect back to the master. If not specified, the master machine's primary IP address is used. On multi-homed machines, such as compute cluster headnodes, this may need to be changed to allow all slaves to reach the master (typically the name of the machine's internal network address is needed). If only running local slaves, 'localhost' can be used to prohibit connections across the network. """ connect_timeout = 7200 # Note: must be higher than that in slave_handler._HeartBeatThread heartbeat_timeout = 7200 def __init__(self, python=None, host=None): if python is None: self._python = sys.executable else: self._python = python self._host = host self._all_slaves = [] self._starting_slaves = {} self._slave_arrays = [] if host: self._host = host else: # Get primary IP address of this machine self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0] self._listen_sock = _ListenSocket(self._host, self.connect_timeout) def add_slave(self, slave): """Add a Slave object.""" if hasattr(slave, '_get_slaves'): self._slave_arrays.append(slave) else: self._all_slaves.append(slave) def get_context(self, startup=None): """Create and return a new Context in which tasks can be run. @param startup If not None, a callable (Python function or class that implements the __call__ method) that sets up the slave to run tasks. This method is only called once per slave. The return values from this method are passed to the task object when it runs on the slave. """ return Context(self, startup) def _get_results_unordered(self, context): """Run all of a context's tasks, and yield results""" self._send_tasks_to_slaves(context) try: while True: for task in self._get_finished_tasks(context): tasks_queued = len(context._tasks) yield task._results # If the user added more tasks while processing these # results, make sure they get sent off to the slaves if len(context._tasks) > tasks_queued: self._send_tasks_to_slaves(context) except _NoMoreTasksError: return def _start_all_slaves(self): for array in self._slave_arrays: self._all_slaves.extend(array._get_slaves()) command = ("%s -c \"import IMP.parallel.slave_handler as s; s.main()\" " "%s %d") % (self._python, self._host, self._listen_sock.port) for (num, slave) in enumerate(self._all_slaves): if slave.ready_to_start(): unique_id = self._get_unique_id(num) self._starting_slaves[unique_id] = slave output = 'slave%d.output' % num slave._start(command, unique_id, output) for array in self._slave_arrays: array._start(command) self._slave_arrays = [] def _get_unique_id(self, num): id = "%d:" % num for i in range(0, 8): id += chr(random.randint(0, 25) + ord('A')) return id def _send_tasks_to_slaves(self, context): self._start_all_slaves() # Prefer slaves that already have the task context available_slaves = [a for a in self._all_slaves if a.ready_for_task(context)] + \ [a for a in self._all_slaves if a.ready_for_task(None)] for slave in available_slaves: if len(context._tasks) == 0: break else: self._send_task_to_slave(slave, context) def _send_task_to_slave(self, slave, context): if len(context._tasks) == 0: return t = context._tasks[0] try: slave._start_task(t, context) context._tasks.pop(0) except socket.error, detail: slave._kill() def _get_finished_tasks(self, context): while True: events = self._get_network_events(context) if len(events) == 0: self._kill_all_running_slaves(context) for event in events: task = self._process_event(event, context) if task: yield task def _process_event(self, event, context): if event == self._listen_sock: # New slave just connected (conn, addr) = self._listen_sock.accept() new_slave = self._accept_slave(conn, context) elif event.running_task(context): try: task = event._get_finished_task() if task: self._send_task_to_slave(event, context) return task else: # the slave sent back a heartbeat self._kill_timed_out_slaves(context) except NetworkError, detail: task = event._kill() print "Slave %s failed (%s): rescheduling task %s" \ % (str(event), str(detail), str(task)) context._tasks.append(task) self._send_tasks_to_slaves(context) else: pass # Slave not running a task def _kill_timed_out_slaves(self, context): timed_out = [a for a in self._all_slaves if a.running_task(context) \ and a.get_contact_timed_out(self.heartbeat_timeout)] for slave in timed_out: task = slave._kill() print("Did not hear from slave %s in %d seconds; rescheduling " "task %s" % (str(slave), self.heartbeat_timeout, str(task))) context._tasks.append(task) if len(timed_out) > 0: self._send_tasks_to_slaves(context) def _kill_all_running_slaves(self, context): running = [a for a in self._all_slaves if a.running_task(context)] for slave in running: task = slave._kill() context._tasks.append(task) raise NetworkError("Did not hear from any running slave in " "%d seconds" % self.heartbeat_timeout) def _accept_slave(self, sock, context): sock.setblocking(True) identifier = sock.recv(1024) if identifier and identifier in self._starting_slaves: slave = self._starting_slaves.pop(identifier) slave._accept_connection(sock) print "Identified slave %s " % str(slave) self._init_slave(slave) self._send_task_to_slave(slave, context) return slave else: print "Ignoring request from unknown slave" def _init_slave(self, slave): if _import_time_path[0] != '': slave._set_python_search_path(_import_time_path[0]) if sys.path[0] != '' and sys.path[0] != _import_time_path[0]: slave._set_python_search_path(sys.path[0]) def _get_network_events(self, context): running = [a for a in self._all_slaves if a.running_task(context)] if len(running) == 0: if len(context._tasks) == 0: raise _NoMoreTasksError() elif len(self._starting_slaves) == 0: raise NoMoreSlavesError("Ran out of slaves to run tasks") # Otherwise, wait for starting slaves to connect back and get tasks return util._poll_events(self._listen_sock, running, self.heartbeat_timeout) %}