add missing monitor_db code



git-svn-id: http://test.kernel.org/svn/autotest/trunk@1264 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/monitor_db b/scheduler/monitor_db
new file mode 100755
index 0000000..28316fe
--- /dev/null
+++ b/scheduler/monitor_db
@@ -0,0 +1,1011 @@
+#!/usr/bin/python -u
+
+"""
+Autotest scheduler
+"""
+__author__ = "Paul Turner <pjt@google.com>"
+
+import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
+import optparse, signal, smtplib, socket
+RESULTS_DIR = '.'
+AUTOSERV_NICE_LEVEL = 10
+
+AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
+
+if os.environ.has_key('AUTOTEST_DIR'):
+	AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
+AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
+AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
+
+if AUTOTEST_SERVER_DIR not in sys.path:
+        sys.path.insert(0, AUTOTEST_SERVER_DIR)
+
+_connection = None
+_cursor = None
+_shutdown = False
+_notify_email = None
+
+
+def main():
+	usage = 'usage: %prog [options] results_dir'
+
+	parser = optparse.OptionParser(usage)
+	parser.add_option('--no-recover', help='Skip machine/job recovery ' +
+	                  'step [for multiple monitors/rolling upgrades]',
+	                  action='store_true')
+	parser.add_option('--logfile', help='Set a log file that all stdout ' +
+ 	                  'should be redirected to.  Stderr will go to this ' +
+	                  'file + ".err"')
+	parser.add_option('--notify', help='Set an email address to be ' +
+	                  'notified of exceptions')
+	(options, args) = parser.parse_args()
+	if len(args) != 1:
+		parser.print_usage()
+		return
+	
+	global RESULTS_DIR
+	RESULTS_DIR = args[0]
+
+	global _notify_email
+	_notify_email = options.notify
+
+	init(options.logfile)
+	dispatcher = Dispatcher(do_recover = not options.no_recover)
+
+	try:
+		while not _shutdown:
+			dispatcher.tick()
+			time.sleep(20)
+		dispatcher.shut_down()
+	except:
+		log_stacktrace("Uncaught exception; terminating monitor_db")
+
+	disconnect()
+
+
+def handle_sigint(signum, frame):
+	global _shutdown
+	_shutdown = True
+	print "Shutdown request received."
+
+
+def init(logfile):
+	if logfile:
+		enable_logging(logfile)
+	print "%s> dispatcher starting" % time.strftime("%X %x")
+	print "My PID is %d" % os.getpid()
+
+	os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
+	connect()
+
+	print "Setting signal handler"
+	signal.signal(signal.SIGINT, handle_sigint)
+	
+	print "Connected! Running..."
+
+
+def enable_logging(logfile):
+	out_file = logfile
+	err_file = "%s.err" % logfile
+	print "Enabling logging to %s (%s)" % (out_file, err_file)
+	out_fd = open(out_file, "a", buffering=0)
+	err_fd = open(err_file, "a", buffering=0)
+
+	os.dup2(out_fd.fileno(), sys.stdout.fileno())
+	os.dup2(err_fd.fileno(), sys.stderr.fileno())
+
+	sys.stdout = out_fd
+	sys.stderr = err_fd
+
+
+def idle_hosts():
+	_cursor.execute("""
+		SELECT * FROM hosts h WHERE 
+		id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
+				(id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active)) 
+			OR
+				(id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe 
+				INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
+		)
+		AND locked=false AND (h.status IS null OR h.status='Ready')	""")
+	hosts = [Host(row=i) for i in _cursor.fetchall()]
+	return hosts
+
+
+def connect():
+	path = os.path.dirname(os.path.abspath(sys.argv[0]))
+	try:
+		file = os.path.join(path, '.database')
+		db_prefs = open(file, 'r')
+		DB_HOST = db_prefs.readline().rstrip()
+		DB_SCHEMA = db_prefs.readline().rstrip()
+	except: 
+		DB_HOST = 'localhost'
+		DB_SCHEMA = 'autotest_web'
+
+	try:
+		file = os.path.join(path, '.priv_login')
+		login = open(file, 'r')
+		DB_USER = login.readline().rstrip()
+		DB_PASS = login.readline().rstrip()
+	except:	
+		try:
+			file = os.path.join(path, '.unpriv_login')
+			login = open(file, 'r')
+			DB_USER = login.readline().rstrip()
+			DB_PASS = login.readline().rstrip()
+		except:
+			DB_USER = 'autotest'
+			DB_PASS = 'password'
+
+	global _connection, _cursor
+	_connection = MySQLdb.connect(
+		host=DB_HOST,
+		user=DB_USER,
+		passwd=DB_PASS,
+		db=DB_SCHEMA
+	)
+	_connection.autocommit(True)
+	_cursor = _connection.cursor()
+
+
+def disconnect():
+	global _connection, _cursor
+	_connection.close()
+	_connection = None
+	_cursor = None
+
+
+def parse_results(results_dir, flags=""):
+	parse = os.path.join(AUTOTEST_TKO_DIR, 'parse')
+	output = os.path.join(results_dir, '.parse.log')
+	os.system("%s %s -r -o %s > %s 2>&1 &" % (parse, flags, results_dir, output))
+
+
+def log_stacktrace(reason):
+	(type, value, tb) = sys.exc_info()
+	str = "EXCEPTION: %s\n" % reason
+	str += "%s / %s / %s\n" % (socket.gethostname(), os.getpid(),
+	                           time.strftime("%X %x"))
+	str += ''.join(traceback.format_exception(type, value, tb))
+
+	sys.stderr.write("\n%s\n" % str)
+
+	if _notify_email:
+		sender = "monitor_db"
+		subject = "monitor_db exception"
+		msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
+		         sender, _notify_email, subject, str)
+		mailer = smtplib.SMTP('localhost')
+		mailer.sendmail(sender, _notify_email, msg)
+		mailer.quit()
+
+
+class Dispatcher:
+	def __init__(self, do_recover=True):
+		self._agents = []
+		self.shutting_down = False
+
+		if do_recover:
+			self._recover_lost()
+		
+
+	def shut_down(self):
+		print "Shutting down!"
+		self.shutting_down = True
+		while self._agents:
+			self.tick()
+			time.sleep(40)
+
+
+	def tick(self):
+		if not self.shutting_down:
+			self._find_more_work()
+		self._handle_agents()
+
+
+	def add_agent(self, agent):
+		self._agents.append(agent)
+		agent.dispatcher = self
+
+
+	def _recover_lost(self):
+		_cursor.execute("""SELECT * FROM host_queue_entries WHERE active AND NOT complete""")
+		if _cursor.rowcount:
+			queue_entries = [HostQueueEntry(row=i) for i in _cursor.fetchall()]
+			for queue_entry in queue_entries:
+				job = queue_entry.job
+				if job.is_synchronous():
+					for child_entry in job.get_host_queue_entries():
+						child_entry.requeue()
+				else:
+					queue_entry.requeue()
+				queue_entry.clear_results_dir()
+			
+		_cursor.execute("""SELECT * FROM hosts
+		                   WHERE status != 'Ready' AND NOT locked""")
+		if _cursor.rowcount:
+			hosts = [Host(row=i) for i in _cursor.fetchall()]
+			for host in hosts:
+				verify_task = VerifyTask(host = host)
+				self.add_agent(Agent(tasks = [verify_task]))
+
+
+	def _find_more_work(self):
+		print "finding work"
+
+		num_started = 0
+		for host in idle_hosts():
+			tasks = host.next_queue_entries()
+			if tasks:
+				for next in tasks:
+					try:
+						agent = next.run(assigned_host=host)
+						if agent:							
+							self.add_agent(agent)
+
+							num_started += 1
+							if num_started>=100:
+								return
+							break
+					except:
+						next.set_status('Failed')
+						
+#						if next.host:
+#							next.host.set_status('Ready')
+
+						log_stacktrace("task_id = %d" % next.id)
+
+
+	def _handle_agents(self):
+		still_running = []
+		for agent in self._agents:
+			agent.tick()
+			if not agent.is_done():
+				still_running.append(agent)
+			else:
+				print "agent finished"
+		self._agents = still_running
+
+
+class RunMonitor(object):
+	def __init__(self, cmd, nice_level = None, log_file = None):
+		self.nice_level = nice_level
+		self.log_file = log_file
+		self.proc = self.run(cmd)
+
+        def run(self, cmd):
+		if self.nice_level:
+			nice_cmd = ['nice','-n', str(self.nice_level)]
+			nice_cmd.extend(cmd)
+			cmd = nice_cmd
+
+		out_file = None
+		if self.log_file:
+			try:
+				out_file = open(self.log_file, 'a')
+				out_file.write("\n%s\n" % ('*'*80))
+				out_file.write("%s> %s\n" % (time.strftime("%X %x"), cmd))
+				out_file.write("%s\n" % ('*'*80))
+			except:
+				pass
+				
+		if not out_file:
+			out_file = open('/dev/null', 'w')
+			
+		in_devnull = open('/dev/null', 'r')
+		print "cmd = %s" % cmd
+		print "path = %s" % os.getcwd()
+
+		proc = subprocess.Popen(cmd, stdout=out_file,
+					stderr=subprocess.STDOUT, stdin=in_devnull)
+		out_file.close()
+		in_devnull.close()
+                return proc
+
+
+	def kill(self):
+		self.proc.kill()
+
+
+	def exit_code(self):
+		return self.proc.poll()
+
+
+class Agent(object):
+	def __init__(self, tasks):
+		self.active_task = None
+		self.queue = Queue.Queue(0)
+		self.dispatcher = None
+		
+		for task in tasks:
+			self.add_task(task)
+
+
+	def add_task(self, task):
+		self.queue.put_nowait(task)
+		task.agent = self
+
+
+	def tick(self):
+		print "agent tick"
+		if self.active_task and not self.active_task.is_done():
+			self.active_task.poll()
+		else:
+			self._next_task();
+
+
+	def _next_task(self):
+		print "agent picking task"
+		if self.active_task:
+			assert self.active_task.is_done()
+
+			if not self.active_task.success and self.active_task.failure_tasks:
+				self.queue = Queue.Queue(0)
+				for task in self.active_task.failure_tasks:
+					self.add_task(task)
+		
+		self.active_task = None
+		if not self.is_done():
+			self.active_task = self.queue.get_nowait()
+			if self.active_task:
+				self.active_task.start()
+
+
+	def is_done(self):
+		return self.active_task == None and self.queue.empty()
+
+
+	def start(self):
+		assert self.dispatcher
+
+		self._next_task()
+
+
+class AgentTask(object):
+	def __init__(self, cmd, failure_tasks = None):
+		self.done = False
+		self.failure_tasks = failure_tasks
+		self.started = False
+		self.cmd = cmd
+		self.agent = None
+
+
+	def poll(self):
+		print "poll"
+		if hasattr(self, 'monitor'):
+			self.tick(self.monitor.exit_code())
+		else:
+			self.finished(False)
+
+
+	def tick(self, exit_code):
+		if exit_code==None:
+			return
+#		print "exit_code was %d" % exit_code
+		if exit_code == 0:
+			success = True
+		else:
+			success = False
+
+		self.finished(success)
+
+
+	def is_done(self):
+		return self.done
+
+
+	def finished(self, success):
+		self.done = True
+		self.success = success
+		self.epilog()
+
+
+	def prolog(self):
+		pass
+
+
+	def epilog(self):
+		pass
+
+
+	def start(self):
+		assert self.agent
+
+		if not self.started:
+			self.prolog()
+			self.run()
+
+		self.started = True
+
+
+	def abort(self):
+		self.monitor.kill()
+
+
+	def run(self):
+		if self.cmd:
+			print "agent starting monitor"
+
+			log_file = None
+			if hasattr(self, 'host'):
+				log_file = os.path.join(os.path.join(RESULTS_DIR, 'hosts'), self.host.hostname)
+				
+			self.monitor = RunMonitor(self.cmd, nice_level = AUTOSERV_NICE_LEVEL, log_file = log_file)
+
+
+class RepairTask(AgentTask):
+	def __init__(self, host):
+		cmd = ['autoserv', '-n', '-R', '-m', host.hostname]
+		self.host = host
+		AgentTask.__init__(self, cmd)
+
+	def prolog(self):
+		print "repair_task starting"
+		self.host.set_status('Repairing')
+
+
+	def epilog(self):
+		if self.success:
+			status = 'Ready'
+		else:
+			status = 'Repair Failed'
+
+		self.host.set_status(status)
+
+
+class VerifyTask(AgentTask):
+	def __init__(self, queue_entry=None, host=None):
+		assert bool(queue_entry) != bool(host)
+
+		self.host = host or queue_entry.host
+		self.queue_entry = queue_entry
+
+		self.temp_results_dir = tempfile.mkdtemp(suffix='.verify')
+		cmd = ['autoserv', '-n', '-v', '-m', self.host.hostname,
+		       '-r', self.temp_results_dir]
+
+		AgentTask.__init__(self, cmd, failure_tasks = [RepairTask(self.host)])
+
+
+	def prolog(self):
+		print "starting verify on %s" % (self.host.hostname)
+		if self.queue_entry:
+			self.queue_entry.set_status('Verifying')
+		self.host.set_status('Verifying')
+
+
+	def epilog(self):
+		if self.queue_entry and (self.success or
+					 not self.queue_entry.meta_host):
+			self.move_results()
+		else:
+			shutil.rmtree(self.temp_results_dir)
+
+		if self.success:
+			status = 'Ready'
+		else:
+			status = 'Failed Verify'
+			if self.queue_entry:
+				if self.queue_entry.meta_host:
+					self.host.yield_work()
+				else:
+					self.queue_entry.set_status('Failed')
+
+		self.host.set_status(status)
+
+
+	def move_results(self):
+		assert self.queue_entry is not None
+		target_dir = self.queue_entry.results_dir()
+		if self.queue_entry.job.is_synchronous():
+			target_dir = os.path.join(target_dir,
+						 self.queue_entry.host.hostname)
+		if not os.path.exists(target_dir):
+			os.makedirs(target_dir)
+		files = os.listdir(self.temp_results_dir)
+		for filename in files:
+			shutil.move(os.path.join(self.temp_results_dir,
+						 filename),
+				    os.path.join(target_dir, filename))
+
+
+class VerifySynchronousTask(VerifyTask):
+	def __init__(self, queue_entry):
+		VerifyTask.__init__(self, queue_entry = queue_entry)
+
+
+	def epilog(self):
+		VerifyTask.epilog(self)
+		print "verify_synchronous finished: %s/%s" % (self.queue_entry.host.hostname, self.success)
+		if self.success:
+			if self.queue_entry.job.num_complete()==0:
+				self.queue_entry.set_status('Pending')
+	
+				job = self.queue_entry.job
+				if job.is_ready():
+					self.agent.dispatcher.add_agent(job.run(self.queue_entry))
+			else:
+				self.queue_entry.set_status('Stopped') # some other entry failed verify
+		else:		
+			if self.queue_entry.meta_host:
+				self.queue_entry.set_status('Queued')
+			else:
+				job = self.queue_entry.job
+				for child_entry in job.get_host_queue_entries():
+					if child_entry.active:
+						child_entry.set_status('Stopped')
+
+
+class QueueTask(AgentTask):
+	def __init__(self, job, queue_entries, cmd):
+		AgentTask.__init__(self, cmd)
+		self.job = job
+		self.queue_entries = queue_entries
+
+
+	@staticmethod
+	def _write_keyval(queue_entry, field, value):
+		key_path = os.path.join(queue_entry.results_dir(), 'keyval')
+		keyval_file = open(key_path, 'a')
+		print >> keyval_file, '%s=%d' % (field, value)
+		keyval_file.close()
+
+
+	def prolog(self):
+		for queue_entry in self.queue_entries:
+			print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
+			queue_entry.set_status('Running')
+			queue_entry.host.set_status('Running')
+			# write some job timestamps into the job keyval file
+			queued = time.mktime(self.job.created_on.timetuple())
+			started = time.time()
+			self._write_keyval(queue_entry, "job_queued", queued)
+			self._write_keyval(queue_entry, "job_started", started)
+
+
+	def epilog(self):
+		if self.success:
+			status = 'Completed'
+		else:
+			status = 'Failed'
+
+		for queue_entry in self.queue_entries:
+			queue_entry.set_status(status)
+			queue_entry.host.set_status('Ready')
+			# write another timestamp into the job keyval file
+			finished = time.time()
+			self._write_keyval(queue_entry, "job_finished",
+					   finished)
+
+		if self.job.is_synchronous() or self.job.num_machines()==1:
+			if self.job.is_finished():
+				parse_results(self.job.results_dir())
+		else:
+			for queue_entry in self.queue_entries:
+				parse_results(queue_entry.results_dir(), flags='-l 2')
+		
+		print "queue_task finished with %s/%s" % (status, self.success)
+
+
+class RebootTask(AgentTask):
+	def __init__(self):
+		AgentTask.__init__(self, host)
+		self.cmd = "autoserv -n -b -m %s /dev/null" % host
+		self.host = host
+
+
+	def tick(self, exit_code):
+		raise "not implemented"
+
+
+	def run(self):
+		raise "not implemented"
+		
+
+
+class DBObject(object):
+        def __init__(self, table, fields, id=None, row=None, new_record=False):
+		assert (bool(id) != bool(row)) and table and fields
+
+		self.__table = table
+		self.__fields = fields
+
+		self.__new_record = new_record
+
+		if row is None:
+			sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
+			_cursor.execute(sql, (id,))
+			if not _cursor.rowcount:
+				raise "row not found (table=%s, id=%s)" % \
+							(self.__table, id)
+			row = _cursor.fetchone()
+
+		assert len(row)==len(fields), "table = %s, row = %s/%d, fields = %s/%d" % (table, row, len(row), fields, len(fields))
+
+		self.__valid_fields = {}
+		for i,value in enumerate(row):
+			self.__dict__[fields[i]] = value
+			self.__valid_fields[fields[i]] = True
+
+		del self.__valid_fields['id']
+
+	def count(self, where, table = None):
+		if not table:
+			table = self.__table
+
+		_cursor.execute("""
+			SELECT count(*) FROM %s
+			WHERE %s
+		""" % (table, where))
+		count = _cursor.fetchall()
+
+		return int(count[0][0])
+
+
+	def num_cols(self):
+		return len(self.__fields)
+
+
+	def update_field(self, field, value):
+		assert self.__valid_fields[field]
+		
+		if self.__dict__[field] == value:
+			return
+
+		query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
+							(self.__table, field)
+		_cursor.execute(query, (value, self.id))
+
+		self.__dict__[field] = value
+
+
+	def save(self):
+		if self.__new_record:
+			keys = self.__fields[1:] # avoid id
+			columns = ','.join([str(key) for key in keys])
+			values = ['"%s"' % self.__dict__[key] for key in keys]
+			values = ','.join(values)
+			query = """INSERT INTO %s (%s) VALUES (%s)""" % \
+						(self.__table, columns, values)
+			_cursor.execute(query)
+
+
+	def delete(self):
+		_cursor.execute("""DELETE FROM %s WHERE id = %%s""" % \
+						self.__table, (self.id,))
+
+
+class IneligibleHostQueue(DBObject):
+	def __init__(self, id=None, row=None, new_record=None):
+		fields = ['id', 'job_id', 'host_id']
+		DBObject.__init__(self, 'ineligible_host_queues', fields,
+					id=id, row=row, new_record=new_record)
+
+
+class Host(DBObject):
+	def __init__(self, id=None, row=None):
+		fields =  ['id', 'hostname', 'locked', 'synch_id','status']
+		DBObject.__init__(self, 'hosts',fields, id=id, row=row)
+		
+
+
+	def current_task(self):
+		_cursor.execute("""
+			SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
+			""", (self.id,))
+		
+		if not _cursor.rowcount:
+			return None
+		else:
+			assert _cursor.rowcount == 1
+			results = _cursor.fetchone();
+#			print "current = %s" % results
+			return HostQueueEntry(row=results)
+
+
+	def next_queue_entries(self):
+		if self.locked:
+			print "%s locked, not queuing" % self.hostname
+			return None
+#		print "%s/%s looking for work" % (self.hostname, self.platform_id)
+		_cursor.execute("""
+			SELECT * FROM host_queue_entries
+			WHERE ((host_id=%s) OR (meta_host IS NOT null AND
+			(meta_host IN (
+				SELECT label_id FROM hosts_labels WHERE host_id=%s
+				)
+			)
+			AND job_id NOT IN ( 
+				SELECT job_id FROM ineligible_host_queues
+				WHERE host_id=%s
+			)))
+			AND NOT complete AND NOT active
+			ORDER BY priority DESC, meta_host, id
+			LIMIT 1
+		""", (self.id,self.id, self.id))
+
+		if not _cursor.rowcount:
+			return None
+		else:
+			return [HostQueueEntry(row=i) for i in _cursor.fetchall()]
+	
+	def yield_work(self):
+		print "%s yielding work" % self.hostname
+		if self.current_task():
+			self.current_task().requeue()
+		
+	def set_status(self,status):
+		self.update_field('status',status)
+
+
+class HostQueueEntry(DBObject):
+	def __init__(self, id=None, row=None):
+		assert id or row
+		fields = ['id', 'job_id', 'host_id', 'priority', 'status',
+			  'meta_host', 'active', 'complete']
+		DBObject.__init__(self, 'host_queue_entries', fields, id=id,
+									row=row)
+
+		self.job = Job(self.job_id)
+
+		if self.host_id:
+			self.host = Host(self.host_id)
+		else:
+			self.host = None
+
+		self.queue_log_path = os.path.join(self.job.results_dir(),
+						   'queue.log.' + str(self.id))
+
+
+	def set_host(self, host):
+		if host:
+			self.queue_log_record('Assigning host ' + host.hostname)
+			self.update_field('host_id', host.id)
+			self.update_field('active', True)
+		else:
+			self.queue_log_record('Releasing host')
+			self.update_field('host_id', None)
+
+		self.host = host
+
+
+	def get_host(self):
+		if not self.host:
+			if self.host_id:
+				self.host  = Host(self.host_id)
+		if self.host:
+			return self.host
+		else:
+			return None
+
+
+	def queue_log_record(self, log_line):
+		queue_log = open(self.queue_log_path, 'a', 0)
+		queue_log.write(log_line + '\n')
+		queue_log.close()
+
+
+	def results_dir(self):
+		if self.job.num_machines()==1 or self.job.is_synchronous():
+			results_dir = self.job.job_dir
+		else:
+			assert self.host
+			results_dir = '%s/%s' % (self.job.job_dir,
+						 self.host.hostname)
+
+		return results_dir
+
+
+	def set_status(self, status):
+		self.update_field('status', status)
+		if self.host:
+			hostname = self.host.hostname
+		else:
+			hostname = 'no host'
+		print "%s/%d status -> %s" % (hostname, self.id, self.status)
+		if status in ['Queued']:
+			self.update_field('complete', False)
+			self.update_field('active', False)
+
+		if status in ['Pending', 'Running', 'Verifying', 'Starting']:
+			self.update_field('complete', False)
+			self.update_field('active', True)
+
+		if status in ['Failed', 'Completed', 'Stopped']:
+			self.update_field('complete', True)
+			self.update_field('active', False)
+
+
+	def run(self,assigned_host=None):
+		if self.meta_host:
+			assert assigned_host
+			self.job.create_results_dir()
+			self.set_host(assigned_host)
+			print "creating block %s/%s" % (self.job.id,
+							self.host.id)
+
+			row = [0, self.job.id, self.host.id]
+			block = IneligibleHostQueue(row=row, new_record=True)
+			block.save()
+		
+		print "%s/%s scheduled on %s, status=%s" % (self.job.name,
+				self.meta_host, self.host.hostname, self.status)
+
+		return self.job.run(queue_entry=self)
+	
+	def requeue(self):
+		self.set_status('Queued')
+		
+		if self.meta_host:
+			self.set_host(None)
+
+
+	def clear_results_dir(self):
+		if os.path.exists(self.results_dir()):
+			shutil.rmtree(self.results_dir())
+
+
+class Job(DBObject):
+	def __init__(self, id=None, row=None):
+		assert id or row
+		DBObject.__init__(self,'jobs',['id','owner','name','priority','control_file',
+                                 'control_type','created_on', 'synch_type', 'synch_count', 
+                                 'synchronizing' ], id=id, row=row)
+
+		self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id, self.owner))
+
+
+	def is_server_job(self):
+		return self.control_type != 2
+
+
+	def get_host_queue_entries(self):
+		_cursor.execute("""
+			SELECT * FROM host_queue_entries
+			WHERE job_id= %s
+		""", (self.id,))
+		entries = [HostQueueEntry(row=i) for i in _cursor.fetchall()]
+
+		assert len(entries)>0
+
+		return entries
+
+
+	def set_status(self, status, update_queues=False):
+		self.update_field('status',status)
+		
+		if update_queues:
+			for queue_entry in self.get_host_queue_entries():
+				queue_entry.set_status(status)
+
+
+	def is_synchronous(self):
+		return self.synch_type == 2
+
+
+	def is_ready(self):
+		if not self.is_synchronous():
+			return True
+		sql = "job_id=%s AND status='Pending'" % self.id
+		count = self.count(sql, table='host_queue_entries')
+		return (count == self.synch_count)
+
+
+	def ready_to_synchronize(self):
+		# heuristic
+		queue_entries = self.get_host_queue_entries()
+		count = 0
+		for queue_entry in queue_entries:
+			if queue_entry.status == 'Pending':
+				count += 1
+
+		return (count/self.synch_count >= 0.5)
+
+
+	def start_synchronizing(self):
+		self.update_field('synchronizing', True)
+
+
+	def results_dir(self):
+		return self.job_dir
+
+	def num_machines(self, clause = None):
+		sql = "job_id=%s" % self.id
+		if clause:
+			sql += " AND (%s)" % clause
+		return self.count(sql, table='host_queue_entries')
+
+
+	def num_queued(self):
+		return self.num_machines('not complete')
+
+
+	def num_active(self):
+		return self.num_machines('active')
+
+
+	def num_complete(self):
+		return self.num_machines('complete')
+
+
+	def is_finished(self):
+		left = self.num_queued()
+		print "%s: %s machines left" % (self.name, left)
+		return left==0
+
+	def stop_synchronizing(self):
+		self.update_field('synchronizing', False)
+		self.set_status('Queued', update_queues = False)
+
+
+	def write_machines_file(self):
+		if self.num_machines()>1:
+			print "writing machines file"
+			mf = open("%s/.machines" % self.job_dir, 'w')
+			for queue_entry in self.get_host_queue_entries():
+				if queue_entry.get_host():
+					mf.write("%s\n" % \
+						queue_entry.get_host().hostname)
+			mf.close()
+
+
+	def create_results_dir(self, queue_entry=None):
+		print "create: active: %s complete %s" % (self.num_active(),
+							  self.num_complete())
+
+		if not os.path.exists(self.job_dir):
+			os.makedirs(self.job_dir)
+
+		if queue_entry:
+			return queue_entry.results_dir()
+		return self.job_dir
+
+
+	def run(self, queue_entry):
+		results_dir = self.create_results_dir(queue_entry)
+
+		if self.is_synchronous():
+			if not self.is_ready():
+				return Agent([VerifySynchronousTask(queue_entry = queue_entry)])
+
+		queue_entry.set_status('Starting')
+
+		ctrl = open(os.tmpnam(), 'w')
+		if self.control_file:
+			ctrl.write(self.control_file)
+		else:
+			ctrl.write("")
+		ctrl.flush()
+
+		if self.is_synchronous():
+			if self.num_machines() > 1:
+				self.write_machines_file()
+			hosts = self.get_host_queue_entries()
+			hostnames = ','.join([i.host.hostname for i in hosts])
+			queue_entries = self.get_host_queue_entries()
+		else:
+			assert queue_entry
+			hostnames = queue_entry.host.hostname
+			queue_entries = [queue_entry]
+
+		params = ['autoserv', '-n', '-r', results_dir,
+			'-b', '-u', self.owner, '-l', self.name,
+			'-m', hostnames, ctrl.name]
+
+		if not self.is_server_job():
+			params.append('-c')
+
+		tasks = []
+		if not self.is_synchronous():
+			tasks.append(VerifyTask(queue_entry))
+			
+		tasks.append(QueueTask(job = self, queue_entries = queue_entries, cmd = params))
+
+		agent = Agent(tasks)
+
+		return agent
+
+
+if __name__ == '__main__':
+	main()