add missing monitor_db code
git-svn-id: http://test.kernel.org/svn/autotest/trunk@1264 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/monitor_db b/scheduler/monitor_db
new file mode 100755
index 0000000..28316fe
--- /dev/null
+++ b/scheduler/monitor_db
@@ -0,0 +1,1011 @@
+#!/usr/bin/python -u
+
+"""
+Autotest scheduler
+"""
+__author__ = "Paul Turner <pjt@google.com>"
+
+import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
+import optparse, signal, smtplib, socket
+RESULTS_DIR = '.'
+AUTOSERV_NICE_LEVEL = 10
+
+AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
+
+if os.environ.has_key('AUTOTEST_DIR'):
+ AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
+AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
+AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
+
+if AUTOTEST_SERVER_DIR not in sys.path:
+ sys.path.insert(0, AUTOTEST_SERVER_DIR)
+
+_connection = None
+_cursor = None
+_shutdown = False
+_notify_email = None
+
+
+def main():
+ usage = 'usage: %prog [options] results_dir'
+
+ parser = optparse.OptionParser(usage)
+ parser.add_option('--no-recover', help='Skip machine/job recovery ' +
+ 'step [for multiple monitors/rolling upgrades]',
+ action='store_true')
+ parser.add_option('--logfile', help='Set a log file that all stdout ' +
+ 'should be redirected to. Stderr will go to this ' +
+ 'file + ".err"')
+ parser.add_option('--notify', help='Set an email address to be ' +
+ 'notified of exceptions')
+ (options, args) = parser.parse_args()
+ if len(args) != 1:
+ parser.print_usage()
+ return
+
+ global RESULTS_DIR
+ RESULTS_DIR = args[0]
+
+ global _notify_email
+ _notify_email = options.notify
+
+ init(options.logfile)
+ dispatcher = Dispatcher(do_recover = not options.no_recover)
+
+ try:
+ while not _shutdown:
+ dispatcher.tick()
+ time.sleep(20)
+ dispatcher.shut_down()
+ except:
+ log_stacktrace("Uncaught exception; terminating monitor_db")
+
+ disconnect()
+
+
+def handle_sigint(signum, frame):
+ global _shutdown
+ _shutdown = True
+ print "Shutdown request received."
+
+
+def init(logfile):
+ if logfile:
+ enable_logging(logfile)
+ print "%s> dispatcher starting" % time.strftime("%X %x")
+ print "My PID is %d" % os.getpid()
+
+ os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
+ connect()
+
+ print "Setting signal handler"
+ signal.signal(signal.SIGINT, handle_sigint)
+
+ print "Connected! Running..."
+
+
+def enable_logging(logfile):
+ out_file = logfile
+ err_file = "%s.err" % logfile
+ print "Enabling logging to %s (%s)" % (out_file, err_file)
+ out_fd = open(out_file, "a", buffering=0)
+ err_fd = open(err_file, "a", buffering=0)
+
+ os.dup2(out_fd.fileno(), sys.stdout.fileno())
+ os.dup2(err_fd.fileno(), sys.stderr.fileno())
+
+ sys.stdout = out_fd
+ sys.stderr = err_fd
+
+
+def idle_hosts():
+ _cursor.execute("""
+ SELECT * FROM hosts h WHERE
+ id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
+ (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
+ OR
+ (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
+ INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
+ )
+ AND locked=false AND (h.status IS null OR h.status='Ready') """)
+ hosts = [Host(row=i) for i in _cursor.fetchall()]
+ return hosts
+
+
+def connect():
+ path = os.path.dirname(os.path.abspath(sys.argv[0]))
+ try:
+ file = os.path.join(path, '.database')
+ db_prefs = open(file, 'r')
+ DB_HOST = db_prefs.readline().rstrip()
+ DB_SCHEMA = db_prefs.readline().rstrip()
+ except:
+ DB_HOST = 'localhost'
+ DB_SCHEMA = 'autotest_web'
+
+ try:
+ file = os.path.join(path, '.priv_login')
+ login = open(file, 'r')
+ DB_USER = login.readline().rstrip()
+ DB_PASS = login.readline().rstrip()
+ except:
+ try:
+ file = os.path.join(path, '.unpriv_login')
+ login = open(file, 'r')
+ DB_USER = login.readline().rstrip()
+ DB_PASS = login.readline().rstrip()
+ except:
+ DB_USER = 'autotest'
+ DB_PASS = 'password'
+
+ global _connection, _cursor
+ _connection = MySQLdb.connect(
+ host=DB_HOST,
+ user=DB_USER,
+ passwd=DB_PASS,
+ db=DB_SCHEMA
+ )
+ _connection.autocommit(True)
+ _cursor = _connection.cursor()
+
+
+def disconnect():
+ global _connection, _cursor
+ _connection.close()
+ _connection = None
+ _cursor = None
+
+
+def parse_results(results_dir, flags=""):
+ parse = os.path.join(AUTOTEST_TKO_DIR, 'parse')
+ output = os.path.join(results_dir, '.parse.log')
+ os.system("%s %s -r -o %s > %s 2>&1 &" % (parse, flags, results_dir, output))
+
+
+def log_stacktrace(reason):
+ (type, value, tb) = sys.exc_info()
+ str = "EXCEPTION: %s\n" % reason
+ str += "%s / %s / %s\n" % (socket.gethostname(), os.getpid(),
+ time.strftime("%X %x"))
+ str += ''.join(traceback.format_exception(type, value, tb))
+
+ sys.stderr.write("\n%s\n" % str)
+
+ if _notify_email:
+ sender = "monitor_db"
+ subject = "monitor_db exception"
+ msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
+ sender, _notify_email, subject, str)
+ mailer = smtplib.SMTP('localhost')
+ mailer.sendmail(sender, _notify_email, msg)
+ mailer.quit()
+
+
+class Dispatcher:
+ def __init__(self, do_recover=True):
+ self._agents = []
+ self.shutting_down = False
+
+ if do_recover:
+ self._recover_lost()
+
+
+ def shut_down(self):
+ print "Shutting down!"
+ self.shutting_down = True
+ while self._agents:
+ self.tick()
+ time.sleep(40)
+
+
+ def tick(self):
+ if not self.shutting_down:
+ self._find_more_work()
+ self._handle_agents()
+
+
+ def add_agent(self, agent):
+ self._agents.append(agent)
+ agent.dispatcher = self
+
+
+ def _recover_lost(self):
+ _cursor.execute("""SELECT * FROM host_queue_entries WHERE active AND NOT complete""")
+ if _cursor.rowcount:
+ queue_entries = [HostQueueEntry(row=i) for i in _cursor.fetchall()]
+ for queue_entry in queue_entries:
+ job = queue_entry.job
+ if job.is_synchronous():
+ for child_entry in job.get_host_queue_entries():
+ child_entry.requeue()
+ else:
+ queue_entry.requeue()
+ queue_entry.clear_results_dir()
+
+ _cursor.execute("""SELECT * FROM hosts
+ WHERE status != 'Ready' AND NOT locked""")
+ if _cursor.rowcount:
+ hosts = [Host(row=i) for i in _cursor.fetchall()]
+ for host in hosts:
+ verify_task = VerifyTask(host = host)
+ self.add_agent(Agent(tasks = [verify_task]))
+
+
+ def _find_more_work(self):
+ print "finding work"
+
+ num_started = 0
+ for host in idle_hosts():
+ tasks = host.next_queue_entries()
+ if tasks:
+ for next in tasks:
+ try:
+ agent = next.run(assigned_host=host)
+ if agent:
+ self.add_agent(agent)
+
+ num_started += 1
+ if num_started>=100:
+ return
+ break
+ except:
+ next.set_status('Failed')
+
+# if next.host:
+# next.host.set_status('Ready')
+
+ log_stacktrace("task_id = %d" % next.id)
+
+
+ def _handle_agents(self):
+ still_running = []
+ for agent in self._agents:
+ agent.tick()
+ if not agent.is_done():
+ still_running.append(agent)
+ else:
+ print "agent finished"
+ self._agents = still_running
+
+
+class RunMonitor(object):
+ def __init__(self, cmd, nice_level = None, log_file = None):
+ self.nice_level = nice_level
+ self.log_file = log_file
+ self.proc = self.run(cmd)
+
+ def run(self, cmd):
+ if self.nice_level:
+ nice_cmd = ['nice','-n', str(self.nice_level)]
+ nice_cmd.extend(cmd)
+ cmd = nice_cmd
+
+ out_file = None
+ if self.log_file:
+ try:
+ out_file = open(self.log_file, 'a')
+ out_file.write("\n%s\n" % ('*'*80))
+ out_file.write("%s> %s\n" % (time.strftime("%X %x"), cmd))
+ out_file.write("%s\n" % ('*'*80))
+ except:
+ pass
+
+ if not out_file:
+ out_file = open('/dev/null', 'w')
+
+ in_devnull = open('/dev/null', 'r')
+ print "cmd = %s" % cmd
+ print "path = %s" % os.getcwd()
+
+ proc = subprocess.Popen(cmd, stdout=out_file,
+ stderr=subprocess.STDOUT, stdin=in_devnull)
+ out_file.close()
+ in_devnull.close()
+ return proc
+
+
+ def kill(self):
+ self.proc.kill()
+
+
+ def exit_code(self):
+ return self.proc.poll()
+
+
+class Agent(object):
+ def __init__(self, tasks):
+ self.active_task = None
+ self.queue = Queue.Queue(0)
+ self.dispatcher = None
+
+ for task in tasks:
+ self.add_task(task)
+
+
+ def add_task(self, task):
+ self.queue.put_nowait(task)
+ task.agent = self
+
+
+ def tick(self):
+ print "agent tick"
+ if self.active_task and not self.active_task.is_done():
+ self.active_task.poll()
+ else:
+ self._next_task();
+
+
+ def _next_task(self):
+ print "agent picking task"
+ if self.active_task:
+ assert self.active_task.is_done()
+
+ if not self.active_task.success and self.active_task.failure_tasks:
+ self.queue = Queue.Queue(0)
+ for task in self.active_task.failure_tasks:
+ self.add_task(task)
+
+ self.active_task = None
+ if not self.is_done():
+ self.active_task = self.queue.get_nowait()
+ if self.active_task:
+ self.active_task.start()
+
+
+ def is_done(self):
+ return self.active_task == None and self.queue.empty()
+
+
+ def start(self):
+ assert self.dispatcher
+
+ self._next_task()
+
+
+class AgentTask(object):
+ def __init__(self, cmd, failure_tasks = None):
+ self.done = False
+ self.failure_tasks = failure_tasks
+ self.started = False
+ self.cmd = cmd
+ self.agent = None
+
+
+ def poll(self):
+ print "poll"
+ if hasattr(self, 'monitor'):
+ self.tick(self.monitor.exit_code())
+ else:
+ self.finished(False)
+
+
+ def tick(self, exit_code):
+ if exit_code==None:
+ return
+# print "exit_code was %d" % exit_code
+ if exit_code == 0:
+ success = True
+ else:
+ success = False
+
+ self.finished(success)
+
+
+ def is_done(self):
+ return self.done
+
+
+ def finished(self, success):
+ self.done = True
+ self.success = success
+ self.epilog()
+
+
+ def prolog(self):
+ pass
+
+
+ def epilog(self):
+ pass
+
+
+ def start(self):
+ assert self.agent
+
+ if not self.started:
+ self.prolog()
+ self.run()
+
+ self.started = True
+
+
+ def abort(self):
+ self.monitor.kill()
+
+
+ def run(self):
+ if self.cmd:
+ print "agent starting monitor"
+
+ log_file = None
+ if hasattr(self, 'host'):
+ log_file = os.path.join(os.path.join(RESULTS_DIR, 'hosts'), self.host.hostname)
+
+ self.monitor = RunMonitor(self.cmd, nice_level = AUTOSERV_NICE_LEVEL, log_file = log_file)
+
+
+class RepairTask(AgentTask):
+ def __init__(self, host):
+ cmd = ['autoserv', '-n', '-R', '-m', host.hostname]
+ self.host = host
+ AgentTask.__init__(self, cmd)
+
+ def prolog(self):
+ print "repair_task starting"
+ self.host.set_status('Repairing')
+
+
+ def epilog(self):
+ if self.success:
+ status = 'Ready'
+ else:
+ status = 'Repair Failed'
+
+ self.host.set_status(status)
+
+
+class VerifyTask(AgentTask):
+ def __init__(self, queue_entry=None, host=None):
+ assert bool(queue_entry) != bool(host)
+
+ self.host = host or queue_entry.host
+ self.queue_entry = queue_entry
+
+ self.temp_results_dir = tempfile.mkdtemp(suffix='.verify')
+ cmd = ['autoserv', '-n', '-v', '-m', self.host.hostname,
+ '-r', self.temp_results_dir]
+
+ AgentTask.__init__(self, cmd, failure_tasks = [RepairTask(self.host)])
+
+
+ def prolog(self):
+ print "starting verify on %s" % (self.host.hostname)
+ if self.queue_entry:
+ self.queue_entry.set_status('Verifying')
+ self.host.set_status('Verifying')
+
+
+ def epilog(self):
+ if self.queue_entry and (self.success or
+ not self.queue_entry.meta_host):
+ self.move_results()
+ else:
+ shutil.rmtree(self.temp_results_dir)
+
+ if self.success:
+ status = 'Ready'
+ else:
+ status = 'Failed Verify'
+ if self.queue_entry:
+ if self.queue_entry.meta_host:
+ self.host.yield_work()
+ else:
+ self.queue_entry.set_status('Failed')
+
+ self.host.set_status(status)
+
+
+ def move_results(self):
+ assert self.queue_entry is not None
+ target_dir = self.queue_entry.results_dir()
+ if self.queue_entry.job.is_synchronous():
+ target_dir = os.path.join(target_dir,
+ self.queue_entry.host.hostname)
+ if not os.path.exists(target_dir):
+ os.makedirs(target_dir)
+ files = os.listdir(self.temp_results_dir)
+ for filename in files:
+ shutil.move(os.path.join(self.temp_results_dir,
+ filename),
+ os.path.join(target_dir, filename))
+
+
+class VerifySynchronousTask(VerifyTask):
+ def __init__(self, queue_entry):
+ VerifyTask.__init__(self, queue_entry = queue_entry)
+
+
+ def epilog(self):
+ VerifyTask.epilog(self)
+ print "verify_synchronous finished: %s/%s" % (self.queue_entry.host.hostname, self.success)
+ if self.success:
+ if self.queue_entry.job.num_complete()==0:
+ self.queue_entry.set_status('Pending')
+
+ job = self.queue_entry.job
+ if job.is_ready():
+ self.agent.dispatcher.add_agent(job.run(self.queue_entry))
+ else:
+ self.queue_entry.set_status('Stopped') # some other entry failed verify
+ else:
+ if self.queue_entry.meta_host:
+ self.queue_entry.set_status('Queued')
+ else:
+ job = self.queue_entry.job
+ for child_entry in job.get_host_queue_entries():
+ if child_entry.active:
+ child_entry.set_status('Stopped')
+
+
+class QueueTask(AgentTask):
+ def __init__(self, job, queue_entries, cmd):
+ AgentTask.__init__(self, cmd)
+ self.job = job
+ self.queue_entries = queue_entries
+
+
+ @staticmethod
+ def _write_keyval(queue_entry, field, value):
+ key_path = os.path.join(queue_entry.results_dir(), 'keyval')
+ keyval_file = open(key_path, 'a')
+ print >> keyval_file, '%s=%d' % (field, value)
+ keyval_file.close()
+
+
+ def prolog(self):
+ for queue_entry in self.queue_entries:
+ print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
+ queue_entry.set_status('Running')
+ queue_entry.host.set_status('Running')
+ # write some job timestamps into the job keyval file
+ queued = time.mktime(self.job.created_on.timetuple())
+ started = time.time()
+ self._write_keyval(queue_entry, "job_queued", queued)
+ self._write_keyval(queue_entry, "job_started", started)
+
+
+ def epilog(self):
+ if self.success:
+ status = 'Completed'
+ else:
+ status = 'Failed'
+
+ for queue_entry in self.queue_entries:
+ queue_entry.set_status(status)
+ queue_entry.host.set_status('Ready')
+ # write another timestamp into the job keyval file
+ finished = time.time()
+ self._write_keyval(queue_entry, "job_finished",
+ finished)
+
+ if self.job.is_synchronous() or self.job.num_machines()==1:
+ if self.job.is_finished():
+ parse_results(self.job.results_dir())
+ else:
+ for queue_entry in self.queue_entries:
+ parse_results(queue_entry.results_dir(), flags='-l 2')
+
+ print "queue_task finished with %s/%s" % (status, self.success)
+
+
+class RebootTask(AgentTask):
+ def __init__(self):
+ AgentTask.__init__(self, host)
+ self.cmd = "autoserv -n -b -m %s /dev/null" % host
+ self.host = host
+
+
+ def tick(self, exit_code):
+ raise "not implemented"
+
+
+ def run(self):
+ raise "not implemented"
+
+
+
+class DBObject(object):
+ def __init__(self, table, fields, id=None, row=None, new_record=False):
+ assert (bool(id) != bool(row)) and table and fields
+
+ self.__table = table
+ self.__fields = fields
+
+ self.__new_record = new_record
+
+ if row is None:
+ sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
+ _cursor.execute(sql, (id,))
+ if not _cursor.rowcount:
+ raise "row not found (table=%s, id=%s)" % \
+ (self.__table, id)
+ row = _cursor.fetchone()
+
+ assert len(row)==len(fields), "table = %s, row = %s/%d, fields = %s/%d" % (table, row, len(row), fields, len(fields))
+
+ self.__valid_fields = {}
+ for i,value in enumerate(row):
+ self.__dict__[fields[i]] = value
+ self.__valid_fields[fields[i]] = True
+
+ del self.__valid_fields['id']
+
+ def count(self, where, table = None):
+ if not table:
+ table = self.__table
+
+ _cursor.execute("""
+ SELECT count(*) FROM %s
+ WHERE %s
+ """ % (table, where))
+ count = _cursor.fetchall()
+
+ return int(count[0][0])
+
+
+ def num_cols(self):
+ return len(self.__fields)
+
+
+ def update_field(self, field, value):
+ assert self.__valid_fields[field]
+
+ if self.__dict__[field] == value:
+ return
+
+ query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
+ (self.__table, field)
+ _cursor.execute(query, (value, self.id))
+
+ self.__dict__[field] = value
+
+
+ def save(self):
+ if self.__new_record:
+ keys = self.__fields[1:] # avoid id
+ columns = ','.join([str(key) for key in keys])
+ values = ['"%s"' % self.__dict__[key] for key in keys]
+ values = ','.join(values)
+ query = """INSERT INTO %s (%s) VALUES (%s)""" % \
+ (self.__table, columns, values)
+ _cursor.execute(query)
+
+
+ def delete(self):
+ _cursor.execute("""DELETE FROM %s WHERE id = %%s""" % \
+ self.__table, (self.id,))
+
+
+class IneligibleHostQueue(DBObject):
+ def __init__(self, id=None, row=None, new_record=None):
+ fields = ['id', 'job_id', 'host_id']
+ DBObject.__init__(self, 'ineligible_host_queues', fields,
+ id=id, row=row, new_record=new_record)
+
+
+class Host(DBObject):
+ def __init__(self, id=None, row=None):
+ fields = ['id', 'hostname', 'locked', 'synch_id','status']
+ DBObject.__init__(self, 'hosts',fields, id=id, row=row)
+
+
+
+ def current_task(self):
+ _cursor.execute("""
+ SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
+ """, (self.id,))
+
+ if not _cursor.rowcount:
+ return None
+ else:
+ assert _cursor.rowcount == 1
+ results = _cursor.fetchone();
+# print "current = %s" % results
+ return HostQueueEntry(row=results)
+
+
+ def next_queue_entries(self):
+ if self.locked:
+ print "%s locked, not queuing" % self.hostname
+ return None
+# print "%s/%s looking for work" % (self.hostname, self.platform_id)
+ _cursor.execute("""
+ SELECT * FROM host_queue_entries
+ WHERE ((host_id=%s) OR (meta_host IS NOT null AND
+ (meta_host IN (
+ SELECT label_id FROM hosts_labels WHERE host_id=%s
+ )
+ )
+ AND job_id NOT IN (
+ SELECT job_id FROM ineligible_host_queues
+ WHERE host_id=%s
+ )))
+ AND NOT complete AND NOT active
+ ORDER BY priority DESC, meta_host, id
+ LIMIT 1
+ """, (self.id,self.id, self.id))
+
+ if not _cursor.rowcount:
+ return None
+ else:
+ return [HostQueueEntry(row=i) for i in _cursor.fetchall()]
+
+ def yield_work(self):
+ print "%s yielding work" % self.hostname
+ if self.current_task():
+ self.current_task().requeue()
+
+ def set_status(self,status):
+ self.update_field('status',status)
+
+
+class HostQueueEntry(DBObject):
+ def __init__(self, id=None, row=None):
+ assert id or row
+ fields = ['id', 'job_id', 'host_id', 'priority', 'status',
+ 'meta_host', 'active', 'complete']
+ DBObject.__init__(self, 'host_queue_entries', fields, id=id,
+ row=row)
+
+ self.job = Job(self.job_id)
+
+ if self.host_id:
+ self.host = Host(self.host_id)
+ else:
+ self.host = None
+
+ self.queue_log_path = os.path.join(self.job.results_dir(),
+ 'queue.log.' + str(self.id))
+
+
+ def set_host(self, host):
+ if host:
+ self.queue_log_record('Assigning host ' + host.hostname)
+ self.update_field('host_id', host.id)
+ self.update_field('active', True)
+ else:
+ self.queue_log_record('Releasing host')
+ self.update_field('host_id', None)
+
+ self.host = host
+
+
+ def get_host(self):
+ if not self.host:
+ if self.host_id:
+ self.host = Host(self.host_id)
+ if self.host:
+ return self.host
+ else:
+ return None
+
+
+ def queue_log_record(self, log_line):
+ queue_log = open(self.queue_log_path, 'a', 0)
+ queue_log.write(log_line + '\n')
+ queue_log.close()
+
+
+ def results_dir(self):
+ if self.job.num_machines()==1 or self.job.is_synchronous():
+ results_dir = self.job.job_dir
+ else:
+ assert self.host
+ results_dir = '%s/%s' % (self.job.job_dir,
+ self.host.hostname)
+
+ return results_dir
+
+
+ def set_status(self, status):
+ self.update_field('status', status)
+ if self.host:
+ hostname = self.host.hostname
+ else:
+ hostname = 'no host'
+ print "%s/%d status -> %s" % (hostname, self.id, self.status)
+ if status in ['Queued']:
+ self.update_field('complete', False)
+ self.update_field('active', False)
+
+ if status in ['Pending', 'Running', 'Verifying', 'Starting']:
+ self.update_field('complete', False)
+ self.update_field('active', True)
+
+ if status in ['Failed', 'Completed', 'Stopped']:
+ self.update_field('complete', True)
+ self.update_field('active', False)
+
+
+ def run(self,assigned_host=None):
+ if self.meta_host:
+ assert assigned_host
+ self.job.create_results_dir()
+ self.set_host(assigned_host)
+ print "creating block %s/%s" % (self.job.id,
+ self.host.id)
+
+ row = [0, self.job.id, self.host.id]
+ block = IneligibleHostQueue(row=row, new_record=True)
+ block.save()
+
+ print "%s/%s scheduled on %s, status=%s" % (self.job.name,
+ self.meta_host, self.host.hostname, self.status)
+
+ return self.job.run(queue_entry=self)
+
+ def requeue(self):
+ self.set_status('Queued')
+
+ if self.meta_host:
+ self.set_host(None)
+
+
+ def clear_results_dir(self):
+ if os.path.exists(self.results_dir()):
+ shutil.rmtree(self.results_dir())
+
+
+class Job(DBObject):
+ def __init__(self, id=None, row=None):
+ assert id or row
+ DBObject.__init__(self,'jobs',['id','owner','name','priority','control_file',
+ 'control_type','created_on', 'synch_type', 'synch_count',
+ 'synchronizing' ], id=id, row=row)
+
+ self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id, self.owner))
+
+
+ def is_server_job(self):
+ return self.control_type != 2
+
+
+ def get_host_queue_entries(self):
+ _cursor.execute("""
+ SELECT * FROM host_queue_entries
+ WHERE job_id= %s
+ """, (self.id,))
+ entries = [HostQueueEntry(row=i) for i in _cursor.fetchall()]
+
+ assert len(entries)>0
+
+ return entries
+
+
+ def set_status(self, status, update_queues=False):
+ self.update_field('status',status)
+
+ if update_queues:
+ for queue_entry in self.get_host_queue_entries():
+ queue_entry.set_status(status)
+
+
+ def is_synchronous(self):
+ return self.synch_type == 2
+
+
+ def is_ready(self):
+ if not self.is_synchronous():
+ return True
+ sql = "job_id=%s AND status='Pending'" % self.id
+ count = self.count(sql, table='host_queue_entries')
+ return (count == self.synch_count)
+
+
+ def ready_to_synchronize(self):
+ # heuristic
+ queue_entries = self.get_host_queue_entries()
+ count = 0
+ for queue_entry in queue_entries:
+ if queue_entry.status == 'Pending':
+ count += 1
+
+ return (count/self.synch_count >= 0.5)
+
+
+ def start_synchronizing(self):
+ self.update_field('synchronizing', True)
+
+
+ def results_dir(self):
+ return self.job_dir
+
+ def num_machines(self, clause = None):
+ sql = "job_id=%s" % self.id
+ if clause:
+ sql += " AND (%s)" % clause
+ return self.count(sql, table='host_queue_entries')
+
+
+ def num_queued(self):
+ return self.num_machines('not complete')
+
+
+ def num_active(self):
+ return self.num_machines('active')
+
+
+ def num_complete(self):
+ return self.num_machines('complete')
+
+
+ def is_finished(self):
+ left = self.num_queued()
+ print "%s: %s machines left" % (self.name, left)
+ return left==0
+
+ def stop_synchronizing(self):
+ self.update_field('synchronizing', False)
+ self.set_status('Queued', update_queues = False)
+
+
+ def write_machines_file(self):
+ if self.num_machines()>1:
+ print "writing machines file"
+ mf = open("%s/.machines" % self.job_dir, 'w')
+ for queue_entry in self.get_host_queue_entries():
+ if queue_entry.get_host():
+ mf.write("%s\n" % \
+ queue_entry.get_host().hostname)
+ mf.close()
+
+
+ def create_results_dir(self, queue_entry=None):
+ print "create: active: %s complete %s" % (self.num_active(),
+ self.num_complete())
+
+ if not os.path.exists(self.job_dir):
+ os.makedirs(self.job_dir)
+
+ if queue_entry:
+ return queue_entry.results_dir()
+ return self.job_dir
+
+
+ def run(self, queue_entry):
+ results_dir = self.create_results_dir(queue_entry)
+
+ if self.is_synchronous():
+ if not self.is_ready():
+ return Agent([VerifySynchronousTask(queue_entry = queue_entry)])
+
+ queue_entry.set_status('Starting')
+
+ ctrl = open(os.tmpnam(), 'w')
+ if self.control_file:
+ ctrl.write(self.control_file)
+ else:
+ ctrl.write("")
+ ctrl.flush()
+
+ if self.is_synchronous():
+ if self.num_machines() > 1:
+ self.write_machines_file()
+ hosts = self.get_host_queue_entries()
+ hostnames = ','.join([i.host.hostname for i in hosts])
+ queue_entries = self.get_host_queue_entries()
+ else:
+ assert queue_entry
+ hostnames = queue_entry.host.hostname
+ queue_entries = [queue_entry]
+
+ params = ['autoserv', '-n', '-r', results_dir,
+ '-b', '-u', self.owner, '-l', self.name,
+ '-m', hostnames, ctrl.name]
+
+ if not self.is_server_job():
+ params.append('-c')
+
+ tasks = []
+ if not self.is_synchronous():
+ tasks.append(VerifyTask(queue_entry))
+
+ tasks.append(QueueTask(job = self, queue_entries = queue_entries, cmd = params))
+
+ agent = Agent(tasks)
+
+ return agent
+
+
+if __name__ == '__main__':
+ main()