blob: 862eb239f2772bb83f997e06863b61c9a65e3e5d [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
mblighe2586682008-02-29 22:45:46 +00009import optparse, signal, smtplib, socket, datetime, stat
mblighb090f142008-02-27 21:33:46 +000010from common import global_config
11
mbligh36768f02008-02-22 18:28:33 +000012RESULTS_DIR = '.'
13AUTOSERV_NICE_LEVEL = 10
14
15AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
16
17if os.environ.has_key('AUTOTEST_DIR'):
18 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
19AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
20AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
21
22if AUTOTEST_SERVER_DIR not in sys.path:
23 sys.path.insert(0, AUTOTEST_SERVER_DIR)
24
mbligh6f8bab42008-02-29 22:45:14 +000025_db = None
mbligh36768f02008-02-22 18:28:33 +000026_shutdown = False
27_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000028_autoserv_path = 'autoserv'
29_testing_mode = False
mbligh36768f02008-02-22 18:28:33 +000030
31
32def main():
33 usage = 'usage: %prog [options] results_dir'
34
35 parser = optparse.OptionParser(usage)
36 parser.add_option('--no-recover', help='Skip machine/job recovery ' +
37 'step [for multiple monitors/rolling upgrades]',
38 action='store_true')
39 parser.add_option('--logfile', help='Set a log file that all stdout ' +
40 'should be redirected to. Stderr will go to this ' +
41 'file + ".err"')
42 parser.add_option('--notify', help='Set an email address to be ' +
43 'notified of exceptions')
mbligh4314a712008-02-29 22:44:30 +000044 parser.add_option('--test', help='Indicate that scheduler is under ' +
45 'test and should use dummy autoserv and no parsing',
46 action='store_true')
mbligh36768f02008-02-22 18:28:33 +000047 (options, args) = parser.parse_args()
48 if len(args) != 1:
49 parser.print_usage()
50 return
51
52 global RESULTS_DIR
53 RESULTS_DIR = args[0]
54
55 global _notify_email
56 _notify_email = options.notify
mbligh4314a712008-02-29 22:44:30 +000057
58 if options.test:
59 global _autoserv_path
60 _autoserv_path = 'autoserv_dummy'
61 global _testing_mode
62 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000063
64 init(options.logfile)
65 dispatcher = Dispatcher(do_recover = not options.no_recover)
66
67 try:
68 while not _shutdown:
69 dispatcher.tick()
70 time.sleep(20)
71 dispatcher.shut_down()
72 except:
73 log_stacktrace("Uncaught exception; terminating monitor_db")
74
mbligh6f8bab42008-02-29 22:45:14 +000075 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +000076
77
78def handle_sigint(signum, frame):
79 global _shutdown
80 _shutdown = True
81 print "Shutdown request received."
82
83
84def init(logfile):
85 if logfile:
86 enable_logging(logfile)
87 print "%s> dispatcher starting" % time.strftime("%X %x")
88 print "My PID is %d" % os.getpid()
89
90 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
mbligh6f8bab42008-02-29 22:45:14 +000091 global _db
92 _db = DatabaseConn()
mbligh36768f02008-02-22 18:28:33 +000093
94 print "Setting signal handler"
95 signal.signal(signal.SIGINT, handle_sigint)
96
97 print "Connected! Running..."
98
99
100def enable_logging(logfile):
101 out_file = logfile
102 err_file = "%s.err" % logfile
103 print "Enabling logging to %s (%s)" % (out_file, err_file)
104 out_fd = open(out_file, "a", buffering=0)
105 err_fd = open(err_file, "a", buffering=0)
106
107 os.dup2(out_fd.fileno(), sys.stdout.fileno())
108 os.dup2(err_fd.fileno(), sys.stderr.fileno())
109
110 sys.stdout = out_fd
111 sys.stderr = err_fd
112
113
114def idle_hosts():
mbligh6f8bab42008-02-29 22:45:14 +0000115 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000116 SELECT * FROM hosts h WHERE
117 id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
118 (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
119 OR
120 (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
121 INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
122 )
123 AND locked=false AND (h.status IS null OR h.status='Ready') """)
mbligh6f8bab42008-02-29 22:45:14 +0000124 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000125 return hosts
126
mblighd5c95802008-03-05 00:33:46 +0000127def queue_entries_to_abort():
128 rows = _db.execute("""
129 SELECT * FROM host_queue_entries WHERE status='Abort';
130 """)
131 qe = [HostQueueEntry(row=i) for i in rows]
132 return qe
mbligh36768f02008-02-22 18:28:33 +0000133
mblighe2586682008-02-29 22:45:46 +0000134def remove_file_or_dir(path):
135 if stat.S_ISDIR(os.stat(path).st_mode):
136 # directory
137 shutil.rmtree(path)
138 else:
139 # file
140 os.remove(path)
141
142
mbligh6f8bab42008-02-29 22:45:14 +0000143class DatabaseConn:
144 def __init__(self):
145 self.reconnect_wait = 20
146 self.conn = None
147 self.cur = None
mbligh36768f02008-02-22 18:28:33 +0000148
mbligh6f8bab42008-02-29 22:45:14 +0000149 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000150
151
mbligh6f8bab42008-02-29 22:45:14 +0000152 def connect(self):
153 self.disconnect()
154
155 # get global config and parse for info
156 c = global_config.global_config
157 dbase = "AUTOTEST_WEB"
mbligh104e9ce2008-03-11 22:01:44 +0000158 DB_HOST = c.get_config_value(dbase, "host")
159 DB_SCHEMA = c.get_config_value(dbase, "database")
mbligh6f8bab42008-02-29 22:45:14 +0000160
161 global _testing_mode
162 if _testing_mode:
163 DB_SCHEMA = 'stresstest_autotest_web'
164
mbligh104e9ce2008-03-11 22:01:44 +0000165 DB_USER = c.get_config_value(dbase, "user")
166 DB_PASS = c.get_config_value(dbase, "password")
mbligh6f8bab42008-02-29 22:45:14 +0000167
168 while not self.conn:
169 try:
170 self.conn = MySQLdb.connect(host=DB_HOST,
171 user=DB_USER,
172 passwd=DB_PASS,
173 db=DB_SCHEMA)
174
175 self.conn.autocommit(True)
176 self.cur = self.conn.cursor()
177 except MySQLdb.OperationalError:
mbligh6f8bab42008-02-29 22:45:14 +0000178 print "Can't connect to MYSQL; reconnecting"
179 time.sleep(self.reconnect_wait)
180 self.disconnect()
181
182
183 def disconnect(self):
184 if self.conn:
185 self.conn.close()
186 self.conn = None
187 self.cur = None
188
189
190 def execute(self, *args, **dargs):
191 while (True):
192 try:
193 self.cur.execute(*args, **dargs)
194 return self.cur.fetchall()
195 except MySQLdb.OperationalError:
196 print "MYSQL connection died; reconnecting"
197 time.sleep(self.reconnect_wait)
198 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000199
200
mblighdbdac6c2008-03-05 15:49:58 +0000201def generate_parse_command(results_dir, flags=""):
202 parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
203 output = os.path.abspath(os.path.join(results_dir, '.parse.log'))
204 cmd = "%s %s -r -o %s > %s 2>&1 &"
205 return cmd % (parse, flags, results_dir, output)
206
207
mbligh36768f02008-02-22 18:28:33 +0000208def parse_results(results_dir, flags=""):
mbligh4314a712008-02-29 22:44:30 +0000209 if _testing_mode:
210 return
mblighdbdac6c2008-03-05 15:49:58 +0000211 os.system(generate_parse_command(results_dir, flags))
mbligh36768f02008-02-22 18:28:33 +0000212
213
214def log_stacktrace(reason):
215 (type, value, tb) = sys.exc_info()
216 str = "EXCEPTION: %s\n" % reason
217 str += "%s / %s / %s\n" % (socket.gethostname(), os.getpid(),
218 time.strftime("%X %x"))
219 str += ''.join(traceback.format_exception(type, value, tb))
220
221 sys.stderr.write("\n%s\n" % str)
222
223 if _notify_email:
224 sender = "monitor_db"
225 subject = "monitor_db exception"
226 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
227 sender, _notify_email, subject, str)
228 mailer = smtplib.SMTP('localhost')
229 mailer.sendmail(sender, _notify_email, msg)
230 mailer.quit()
231
232
233class Dispatcher:
234 def __init__(self, do_recover=True):
235 self._agents = []
236 self.shutting_down = False
237
238 if do_recover:
239 self._recover_lost()
240
241
242 def shut_down(self):
243 print "Shutting down!"
244 self.shutting_down = True
245 while self._agents:
246 self.tick()
247 time.sleep(40)
248
249
250 def tick(self):
251 if not self.shutting_down:
mblighd5c95802008-03-05 00:33:46 +0000252 self._find_aborting()
mbligh36768f02008-02-22 18:28:33 +0000253 self._find_more_work()
254 self._handle_agents()
255
256
257 def add_agent(self, agent):
258 self._agents.append(agent)
259 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000260
261 # Find agent corresponding to the specified queue_entry
262 def get_agents(self, queue_entry):
263 res_agents = []
264 for agent in self._agents:
265 if queue_entry.id in agent.queue_entry_ids:
266 res_agents.append(agent)
267 return res_agents
268
269
270 def remove_agent(self, agent):
271 self._agents.remove(agent)
mbligh36768f02008-02-22 18:28:33 +0000272
273
274 def _recover_lost(self):
mblighd5c95802008-03-05 00:33:46 +0000275 rows = _db.execute("""SELECT * FROM host_queue_entries WHERE active AND NOT complete AND status != 'Abort' AND status != 'Aborting'""")
mbligh6f8bab42008-02-29 22:45:14 +0000276 if len(rows) > 0:
277 queue_entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000278 for queue_entry in queue_entries:
279 job = queue_entry.job
280 if job.is_synchronous():
281 for child_entry in job.get_host_queue_entries():
282 child_entry.requeue()
283 else:
284 queue_entry.requeue()
285 queue_entry.clear_results_dir()
286
mblighd5c95802008-03-05 00:33:46 +0000287 rebooting_host_ids = []
288 rows = _db.execute("""SELECT * FROM host_queue_entries
289 WHERE status='Abort' or status='Aborting'""")
290 if len(rows) > 0:
291 queue_entries = [HostQueueEntry(row=i) for i in rows]
292 for queue_entry in queue_entries:
293 queue_host = queue_entry.get_host()
294 reboot_task = RebootTask(queue_host)
295 verify_task = VerifyTask(host = queue_host)
296 self.add_agent(Agent(tasks=[reboot_task,
297 verify_task],
298 queue_entry_ids=[queue_entry.id]))
299 queue_entry.set_status('Aborted')
300 # Secure the host from being picked up
301 queue_host.set_status('Rebooting')
302 rebooting_host_ids.append(queue_host.id)
303
mbligh6f8bab42008-02-29 22:45:14 +0000304 rows = _db.execute("""SELECT * FROM hosts
mbligh36768f02008-02-22 18:28:33 +0000305 WHERE status != 'Ready' AND NOT locked""")
mbligh6f8bab42008-02-29 22:45:14 +0000306 if len(rows) > 0:
307 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000308 for host in hosts:
mblighd5c95802008-03-05 00:33:46 +0000309 if host.id in rebooting_host_ids:
310 continue
mbligh36768f02008-02-22 18:28:33 +0000311 verify_task = VerifyTask(host = host)
312 self.add_agent(Agent(tasks = [verify_task]))
313
314
315 def _find_more_work(self):
316 print "finding work"
317
318 num_started = 0
319 for host in idle_hosts():
320 tasks = host.next_queue_entries()
321 if tasks:
322 for next in tasks:
323 try:
324 agent = next.run(assigned_host=host)
325 if agent:
326 self.add_agent(agent)
327
328 num_started += 1
329 if num_started>=100:
330 return
331 break
332 except:
333 next.set_status('Failed')
334
335# if next.host:
336# next.host.set_status('Ready')
337
338 log_stacktrace("task_id = %d" % next.id)
339
340
mblighd5c95802008-03-05 00:33:46 +0000341 def _find_aborting(self):
342 num_aborted = 0
343 # Find jobs that are aborting
344 for entry in queue_entries_to_abort():
345 agents_to_abort = self.get_agents(entry)
346 entry_host = entry.get_host()
347 reboot_task = RebootTask(entry_host)
348 verify_task = VerifyTask(host = entry_host)
349 tasks = [reboot_task, verify_task]
350 if agents_to_abort:
351 abort_task = AbortTask(entry, agents_to_abort)
352 tasks.insert(0, abort_task)
353 else:
354 entry.set_status('Aborted')
355 # just to make sure this host does not get
356 # taken away
357 entry_host.set_status('Rebooting')
358 self.add_agent(Agent(tasks=tasks,
359 queue_entry_ids = [entry.id]))
360 num_aborted += 1
361 if num_aborted >= 50:
362 break
363
364
mbligh36768f02008-02-22 18:28:33 +0000365 def _handle_agents(self):
366 still_running = []
367 for agent in self._agents:
368 agent.tick()
369 if not agent.is_done():
370 still_running.append(agent)
371 else:
372 print "agent finished"
373 self._agents = still_running
374
375
376class RunMonitor(object):
377 def __init__(self, cmd, nice_level = None, log_file = None):
378 self.nice_level = nice_level
379 self.log_file = log_file
380 self.proc = self.run(cmd)
381
382 def run(self, cmd):
383 if self.nice_level:
384 nice_cmd = ['nice','-n', str(self.nice_level)]
385 nice_cmd.extend(cmd)
386 cmd = nice_cmd
387
388 out_file = None
389 if self.log_file:
390 try:
391 out_file = open(self.log_file, 'a')
392 out_file.write("\n%s\n" % ('*'*80))
393 out_file.write("%s> %s\n" % (time.strftime("%X %x"), cmd))
394 out_file.write("%s\n" % ('*'*80))
395 except:
396 pass
397
398 if not out_file:
399 out_file = open('/dev/null', 'w')
400
401 in_devnull = open('/dev/null', 'r')
402 print "cmd = %s" % cmd
403 print "path = %s" % os.getcwd()
404
405 proc = subprocess.Popen(cmd, stdout=out_file,
406 stderr=subprocess.STDOUT, stdin=in_devnull)
407 out_file.close()
408 in_devnull.close()
409 return proc
410
411
412 def kill(self):
mbligh38c2d032008-03-07 00:26:57 +0000413 if self.proc.poll() == None:
414 os.kill(self.proc.pid, signal.SIGCONT)
415 os.kill(self.proc.pid, signal.SIGTERM)
mblighd5c95802008-03-05 00:33:46 +0000416
mbligh36768f02008-02-22 18:28:33 +0000417
418 def exit_code(self):
419 return self.proc.poll()
420
421
422class Agent(object):
mblighd5c95802008-03-05 00:33:46 +0000423 def __init__(self, tasks, queue_entry_ids=[]):
mbligh36768f02008-02-22 18:28:33 +0000424 self.active_task = None
425 self.queue = Queue.Queue(0)
426 self.dispatcher = None
mblighd5c95802008-03-05 00:33:46 +0000427 self.queue_entry_ids = queue_entry_ids
mbligh36768f02008-02-22 18:28:33 +0000428
429 for task in tasks:
430 self.add_task(task)
431
432
433 def add_task(self, task):
434 self.queue.put_nowait(task)
435 task.agent = self
436
437
438 def tick(self):
439 print "agent tick"
440 if self.active_task and not self.active_task.is_done():
441 self.active_task.poll()
442 else:
443 self._next_task();
444
445
446 def _next_task(self):
447 print "agent picking task"
448 if self.active_task:
449 assert self.active_task.is_done()
450
mblighe2586682008-02-29 22:45:46 +0000451 if not self.active_task.success:
452 self.on_task_failure()
453
mbligh36768f02008-02-22 18:28:33 +0000454 self.active_task = None
455 if not self.is_done():
456 self.active_task = self.queue.get_nowait()
457 if self.active_task:
458 self.active_task.start()
459
460
mblighe2586682008-02-29 22:45:46 +0000461 def on_task_failure(self):
mblighe2586682008-02-29 22:45:46 +0000462 self.queue = Queue.Queue(0)
463 for task in self.active_task.failure_tasks:
464 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +0000465
mblighe2586682008-02-29 22:45:46 +0000466
mbligh36768f02008-02-22 18:28:33 +0000467 def is_done(self):
468 return self.active_task == None and self.queue.empty()
469
470
471 def start(self):
472 assert self.dispatcher
473
474 self._next_task()
475
mblighd5c95802008-03-05 00:33:46 +0000476
mbligh36768f02008-02-22 18:28:33 +0000477class AgentTask(object):
mbligh16c722d2008-03-05 00:58:44 +0000478 def __init__(self, cmd, failure_tasks = []):
mblighe2586682008-02-29 22:45:46 +0000479 """\
480 By default, on failure, the Agent's task queue is cleared and
481 replaced with the tasks in failure_tasks. If
482 clear_queue_on_failure=False, the task queue will not be
483 cleared, and the tasks in failure_tasks will be inserted at the
484 beginning of the queue.
485 """
mbligh36768f02008-02-22 18:28:33 +0000486 self.done = False
487 self.failure_tasks = failure_tasks
488 self.started = False
489 self.cmd = cmd
mblighd5c95802008-03-05 00:33:46 +0000490 self.task = None
mbligh36768f02008-02-22 18:28:33 +0000491 self.agent = None
mblighd5c95802008-03-05 00:33:46 +0000492 self.monitor = None
mbligh36768f02008-02-22 18:28:33 +0000493
494
495 def poll(self):
496 print "poll"
mblighd5c95802008-03-05 00:33:46 +0000497 if self.monitor:
mbligh36768f02008-02-22 18:28:33 +0000498 self.tick(self.monitor.exit_code())
499 else:
500 self.finished(False)
501
502
503 def tick(self, exit_code):
504 if exit_code==None:
505 return
506# print "exit_code was %d" % exit_code
507 if exit_code == 0:
508 success = True
509 else:
510 success = False
511
512 self.finished(success)
513
514
515 def is_done(self):
516 return self.done
517
518
519 def finished(self, success):
520 self.done = True
521 self.success = success
522 self.epilog()
523
524
525 def prolog(self):
526 pass
527
528
529 def epilog(self):
530 pass
531
532
533 def start(self):
534 assert self.agent
535
536 if not self.started:
537 self.prolog()
538 self.run()
539
540 self.started = True
541
542
543 def abort(self):
mblighd5c95802008-03-05 00:33:46 +0000544 if self.monitor:
545 self.monitor.kill()
546 self.done = True
mbligh36768f02008-02-22 18:28:33 +0000547
548
549 def run(self):
550 if self.cmd:
551 print "agent starting monitor"
552
553 log_file = None
554 if hasattr(self, 'host'):
555 log_file = os.path.join(os.path.join(RESULTS_DIR, 'hosts'), self.host.hostname)
556
557 self.monitor = RunMonitor(self.cmd, nice_level = AUTOSERV_NICE_LEVEL, log_file = log_file)
558
559
560class RepairTask(AgentTask):
mbligh16c722d2008-03-05 00:58:44 +0000561 def __init__(self, host, fail_queue_entry=None):
562 """\
563 fail_queue_entry: queue entry to mark failed if this repair
564 fails.
565 """
mbligh48c10a52008-02-29 22:46:38 +0000566 cmd = [_autoserv_path , '-R', '-m', host.hostname]
mbligh36768f02008-02-22 18:28:33 +0000567 self.host = host
mbligh16c722d2008-03-05 00:58:44 +0000568 self.fail_queue_entry = fail_queue_entry
569 AgentTask.__init__(self, cmd)
mblighe2586682008-02-29 22:45:46 +0000570
mbligh36768f02008-02-22 18:28:33 +0000571
572 def prolog(self):
573 print "repair_task starting"
574 self.host.set_status('Repairing')
575
576
577 def epilog(self):
578 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000579 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +0000580 else:
mbligh16c722d2008-03-05 00:58:44 +0000581 self.host.set_status('Repair Failed')
582 if self.fail_queue_entry:
583 self.fail_queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +0000584
585
586class VerifyTask(AgentTask):
587 def __init__(self, queue_entry=None, host=None):
588 assert bool(queue_entry) != bool(host)
589
590 self.host = host or queue_entry.host
591 self.queue_entry = queue_entry
592
593 self.temp_results_dir = tempfile.mkdtemp(suffix='.verify')
mbligh48c10a52008-02-29 22:46:38 +0000594 cmd = [_autoserv_path,'-v','-m',self.host.hostname,
mbligh36768f02008-02-22 18:28:33 +0000595 '-r', self.temp_results_dir]
596
mbligh16c722d2008-03-05 00:58:44 +0000597 fail_queue_entry = None
598 if queue_entry and not queue_entry.meta_host:
599 fail_queue_entry = queue_entry
600 failure_tasks = [RepairTask(self.host, fail_queue_entry)]
mblighe2586682008-02-29 22:45:46 +0000601
mblighdffd6372008-02-29 22:47:33 +0000602 AgentTask.__init__(self, cmd, failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +0000603
604
mbligh36768f02008-02-22 18:28:33 +0000605 def prolog(self):
606 print "starting verify on %s" % (self.host.hostname)
607 if self.queue_entry:
608 self.queue_entry.set_status('Verifying')
mblighdffd6372008-02-29 22:47:33 +0000609 self.queue_entry.clear_results_dir(
610 self.queue_entry.verify_results_dir())
mbligh36768f02008-02-22 18:28:33 +0000611 self.host.set_status('Verifying')
612
613
614 def epilog(self):
615 if self.queue_entry and (self.success or
616 not self.queue_entry.meta_host):
617 self.move_results()
mblighe2586682008-02-29 22:45:46 +0000618 shutil.rmtree(self.temp_results_dir)
mbligh36768f02008-02-22 18:28:33 +0000619
620 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000621 self.host.set_status('Ready')
622 elif self.queue_entry:
mblighdffd6372008-02-29 22:47:33 +0000623 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +0000624
625
626 def move_results(self):
627 assert self.queue_entry is not None
mblighe2586682008-02-29 22:45:46 +0000628 target_dir = self.queue_entry.verify_results_dir()
mbligh36768f02008-02-22 18:28:33 +0000629 if not os.path.exists(target_dir):
630 os.makedirs(target_dir)
631 files = os.listdir(self.temp_results_dir)
632 for filename in files:
mblighe2586682008-02-29 22:45:46 +0000633 self.force_move(os.path.join(self.temp_results_dir,
634 filename),
635 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +0000636
637
mblighe2586682008-02-29 22:45:46 +0000638 @staticmethod
639 def force_move(source, dest):
640 """\
641 Replacement for shutil.move() that will delete the destination
642 if it exists, even if it's a directory.
643 """
644 if os.path.exists(dest):
645 print ('Warning: removing existing destination file ' +
646 dest)
647 remove_file_or_dir(dest)
648 shutil.move(source, dest)
649
650
mblighdffd6372008-02-29 22:47:33 +0000651class VerifySynchronousTask(VerifyTask):
652 def __init__(self, queue_entry):
653 VerifyTask.__init__(self, queue_entry = queue_entry)
654
655
mbligh16c722d2008-03-05 00:58:44 +0000656 def epilog(self):
657 VerifyTask.epilog(self)
658 if self.success:
659 if self.queue_entry.job.num_complete() > 0:
660 # some other entry failed verify, and we've
661 # already been marked as stopped
662 return
mblighdffd6372008-02-29 22:47:33 +0000663
mbligh16c722d2008-03-05 00:58:44 +0000664 self.queue_entry.set_status('Pending')
665 job = self.queue_entry.job
666 if job.is_ready():
667 agent = job.run(self.queue_entry)
668 self.agent.dispatcher.add_agent(agent)
mblighe2586682008-02-29 22:45:46 +0000669
mbligh36768f02008-02-22 18:28:33 +0000670class QueueTask(AgentTask):
671 def __init__(self, job, queue_entries, cmd):
672 AgentTask.__init__(self, cmd)
673 self.job = job
674 self.queue_entries = queue_entries
675
676
mbligh4314a712008-02-29 22:44:30 +0000677 @staticmethod
678 def _write_keyval(queue_entry, field, value):
mbligh36768f02008-02-22 18:28:33 +0000679 key_path = os.path.join(queue_entry.results_dir(), 'keyval')
680 keyval_file = open(key_path, 'a')
681 print >> keyval_file, '%s=%d' % (field, value)
682 keyval_file.close()
683
684
685 def prolog(self):
mblighdbdac6c2008-03-05 15:49:58 +0000686 # write the parser commands into the results directories
687 if self.job.is_synchronous() or self.job.num_machines()==1:
688 results_dir = self.job.results_dir()
689 cmdfile = os.path.join(results_dir, '.parse.cmd')
690 cmd = generate_parse_command(results_dir)
691 print >> open(cmdfile, 'w'), cmd
692 else:
693 for queue_entry in self.queue_entries:
694 results_dir = queue_entry.results_dir()
695 cmdfile = os.path.join(results_dir,
696 '.parse.cmd')
697 cmd = generate_parse_command(results_dir,
698 '-l 2')
699 print >> open(cmdfile, 'w'), cmd
700
mblighe2586682008-02-29 22:45:46 +0000701 # write some job timestamps into the job keyval file
702 queued = time.mktime(self.job.created_on.timetuple())
703 started = time.time()
704 self._write_keyval(self.queue_entries[0], "job_queued", queued)
705 self._write_keyval(self.queue_entries[0], "job_started",
706 started)
mbligh36768f02008-02-22 18:28:33 +0000707 for queue_entry in self.queue_entries:
708 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
709 queue_entry.set_status('Running')
710 queue_entry.host.set_status('Running')
mblighe2586682008-02-29 22:45:46 +0000711 if (not self.job.is_synchronous() and
712 self.job.num_machines() > 1):
713 assert len(self.queue_entries) == 1
714 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +0000715
716
717 def epilog(self):
718 if self.success:
719 status = 'Completed'
720 else:
721 status = 'Failed'
722
mblighe2586682008-02-29 22:45:46 +0000723 # write another timestamp into the job keyval file
724 finished = time.time()
725 self._write_keyval(self.queue_entries[0], "job_finished",
726 finished)
mbligh36768f02008-02-22 18:28:33 +0000727 for queue_entry in self.queue_entries:
728 queue_entry.set_status(status)
729 queue_entry.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +0000730
731 if self.job.is_synchronous() or self.job.num_machines()==1:
732 if self.job.is_finished():
733 parse_results(self.job.results_dir())
734 else:
735 for queue_entry in self.queue_entries:
736 parse_results(queue_entry.results_dir(), flags='-l 2')
737
738 print "queue_task finished with %s/%s" % (status, self.success)
739
740
741class RebootTask(AgentTask):
mblighd5c95802008-03-05 00:33:46 +0000742 def __init__(self, host):
743 global _autoserv_path
744
745 # Current implementation of autoserv requires control file
746 # to be passed on reboot action request. TODO: remove when no
747 # longer appropriate.
748 self.cmd = [_autoserv_path, '-b', '-m', host.hostname,
749 '/dev/null']
mbligh36768f02008-02-22 18:28:33 +0000750 self.host = host
mblighd5c95802008-03-05 00:33:46 +0000751 AgentTask.__init__(self, self.cmd,
mbligh16c722d2008-03-05 00:58:44 +0000752 failure_tasks=[RepairTask(host)])
753
mblighd5c95802008-03-05 00:33:46 +0000754
755 def prolog(self):
756 print "starting reboot task for host: %s" % self.host.hostname
757 self.host.set_status("Rebooting")
758
mblighd5c95802008-03-05 00:33:46 +0000759
760class AbortTask(AgentTask):
761 def __init__(self, queue_entry, agents_to_abort):
762 self.queue_entry = queue_entry
763 self.agents_to_abort = agents_to_abort
764 for agent in agents_to_abort:
765 agent.dispatcher.remove_agent(agent)
766 AgentTask.__init__(self, '')
mbligh36768f02008-02-22 18:28:33 +0000767
768
mblighd5c95802008-03-05 00:33:46 +0000769 def prolog(self):
770 print "starting abort on host %s, job %s" % (
771 self.queue_entry.host_id, self.queue_entry.job_id)
772 self.queue_entry.set_status('Aborting')
773
mbligh36768f02008-02-22 18:28:33 +0000774
mblighd5c95802008-03-05 00:33:46 +0000775 def epilog(self):
776 self.queue_entry.set_status('Aborted')
777 self.success = True
mbligh36768f02008-02-22 18:28:33 +0000778
779 def run(self):
mblighd5c95802008-03-05 00:33:46 +0000780 for agent in self.agents_to_abort:
781 if (agent.active_task):
782 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +0000783
784
785class DBObject(object):
mblighe2586682008-02-29 22:45:46 +0000786 def __init__(self, fields, id=None, row=None, new_record=False):
787 assert (bool(id) != bool(row)) and fields
mbligh36768f02008-02-22 18:28:33 +0000788
mblighe2586682008-02-29 22:45:46 +0000789 self.__table = self._get_table()
mbligh36768f02008-02-22 18:28:33 +0000790 self.__fields = fields
791
792 self.__new_record = new_record
793
794 if row is None:
795 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
mbligh6f8bab42008-02-29 22:45:14 +0000796 rows = _db.execute(sql, (id,))
797 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +0000798 raise "row not found (table=%s, id=%s)" % \
799 (self.__table, id)
mbligh6f8bab42008-02-29 22:45:14 +0000800 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +0000801
mblighe2586682008-02-29 22:45:46 +0000802 assert len(row)==len(fields), (
803 "table = %s, row = %s/%d, fields = %s/%d" % (
804 self.__table, row, len(row), fields, len(fields)))
mbligh36768f02008-02-22 18:28:33 +0000805
806 self.__valid_fields = {}
807 for i,value in enumerate(row):
808 self.__dict__[fields[i]] = value
809 self.__valid_fields[fields[i]] = True
810
811 del self.__valid_fields['id']
812
mblighe2586682008-02-29 22:45:46 +0000813
814 @classmethod
815 def _get_table(cls):
816 raise NotImplementedError('Subclasses must override this')
817
818
mbligh36768f02008-02-22 18:28:33 +0000819 def count(self, where, table = None):
820 if not table:
821 table = self.__table
mbligh4314a712008-02-29 22:44:30 +0000822
mbligh6f8bab42008-02-29 22:45:14 +0000823 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000824 SELECT count(*) FROM %s
825 WHERE %s
826 """ % (table, where))
mbligh36768f02008-02-22 18:28:33 +0000827
mbligh6f8bab42008-02-29 22:45:14 +0000828 assert len(rows) == 1
829
830 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +0000831
832
833 def num_cols(self):
834 return len(self.__fields)
835
836
837 def update_field(self, field, value):
838 assert self.__valid_fields[field]
839
840 if self.__dict__[field] == value:
841 return
842
843 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
844 (self.__table, field)
mbligh6f8bab42008-02-29 22:45:14 +0000845 _db.execute(query, (value, self.id))
mbligh36768f02008-02-22 18:28:33 +0000846
847 self.__dict__[field] = value
848
849
850 def save(self):
851 if self.__new_record:
852 keys = self.__fields[1:] # avoid id
853 columns = ','.join([str(key) for key in keys])
854 values = ['"%s"' % self.__dict__[key] for key in keys]
855 values = ','.join(values)
856 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
857 (self.__table, columns, values)
mbligh6f8bab42008-02-29 22:45:14 +0000858 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +0000859
860
mblighe2586682008-02-29 22:45:46 +0000861 def delete(self):
862 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
863 _db.execute(query, (self.id,))
864
865
866 @classmethod
867 def fetch(cls, where):
868 rows = _db.execute(
869 'SELECT * FROM %s WHERE %s' % (cls._get_table(), where))
870 for row in rows:
871 yield cls(row=row)
872
mbligh36768f02008-02-22 18:28:33 +0000873
874class IneligibleHostQueue(DBObject):
875 def __init__(self, id=None, row=None, new_record=None):
876 fields = ['id', 'job_id', 'host_id']
mblighe2586682008-02-29 22:45:46 +0000877 DBObject.__init__(self, fields, id=id, row=row,
878 new_record=new_record)
879
880
881 @classmethod
882 def _get_table(cls):
883 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +0000884
885
886class Host(DBObject):
887 def __init__(self, id=None, row=None):
888 fields = ['id', 'hostname', 'locked', 'synch_id','status']
mblighe2586682008-02-29 22:45:46 +0000889 DBObject.__init__(self, fields, id=id, row=row)
890
891
892 @classmethod
893 def _get_table(cls):
894 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +0000895
896
897 def current_task(self):
mbligh6f8bab42008-02-29 22:45:14 +0000898 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000899 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
900 """, (self.id,))
901
mbligh6f8bab42008-02-29 22:45:14 +0000902 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +0000903 return None
904 else:
mbligh6f8bab42008-02-29 22:45:14 +0000905 assert len(rows) == 1
906 results = rows[0];
mbligh36768f02008-02-22 18:28:33 +0000907# print "current = %s" % results
908 return HostQueueEntry(row=results)
909
910
911 def next_queue_entries(self):
912 if self.locked:
913 print "%s locked, not queuing" % self.hostname
914 return None
915# print "%s/%s looking for work" % (self.hostname, self.platform_id)
mbligh6f8bab42008-02-29 22:45:14 +0000916 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000917 SELECT * FROM host_queue_entries
918 WHERE ((host_id=%s) OR (meta_host IS NOT null AND
919 (meta_host IN (
920 SELECT label_id FROM hosts_labels WHERE host_id=%s
921 )
922 )
923 AND job_id NOT IN (
924 SELECT job_id FROM ineligible_host_queues
925 WHERE host_id=%s
926 )))
927 AND NOT complete AND NOT active
928 ORDER BY priority DESC, meta_host, id
929 LIMIT 1
930 """, (self.id,self.id, self.id))
931
mbligh6f8bab42008-02-29 22:45:14 +0000932 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +0000933 return None
934 else:
mbligh6f8bab42008-02-29 22:45:14 +0000935 return [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000936
937 def yield_work(self):
938 print "%s yielding work" % self.hostname
939 if self.current_task():
940 self.current_task().requeue()
941
942 def set_status(self,status):
943 self.update_field('status',status)
944
945
946class HostQueueEntry(DBObject):
947 def __init__(self, id=None, row=None):
948 assert id or row
949 fields = ['id', 'job_id', 'host_id', 'priority', 'status',
950 'meta_host', 'active', 'complete']
mblighe2586682008-02-29 22:45:46 +0000951 DBObject.__init__(self, fields, id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +0000952
953 self.job = Job(self.job_id)
954
955 if self.host_id:
956 self.host = Host(self.host_id)
957 else:
958 self.host = None
959
960 self.queue_log_path = os.path.join(self.job.results_dir(),
961 'queue.log.' + str(self.id))
962
963
mblighe2586682008-02-29 22:45:46 +0000964 @classmethod
965 def _get_table(cls):
966 return 'host_queue_entries'
967
968
mbligh36768f02008-02-22 18:28:33 +0000969 def set_host(self, host):
970 if host:
971 self.queue_log_record('Assigning host ' + host.hostname)
972 self.update_field('host_id', host.id)
973 self.update_field('active', True)
mblighe2586682008-02-29 22:45:46 +0000974 self.block_host(host.id)
mbligh36768f02008-02-22 18:28:33 +0000975 else:
976 self.queue_log_record('Releasing host')
mblighe2586682008-02-29 22:45:46 +0000977 self.unblock_host(self.host.id)
mbligh36768f02008-02-22 18:28:33 +0000978 self.update_field('host_id', None)
979
980 self.host = host
981
982
983 def get_host(self):
mblighe2586682008-02-29 22:45:46 +0000984 return self.host
mbligh36768f02008-02-22 18:28:33 +0000985
986
987 def queue_log_record(self, log_line):
mblighe2586682008-02-29 22:45:46 +0000988 now = str(datetime.datetime.now())
mbligh36768f02008-02-22 18:28:33 +0000989 queue_log = open(self.queue_log_path, 'a', 0)
mblighe2586682008-02-29 22:45:46 +0000990 queue_log.write(now + ' ' + log_line + '\n')
mbligh36768f02008-02-22 18:28:33 +0000991 queue_log.close()
992
993
mblighe2586682008-02-29 22:45:46 +0000994 def block_host(self, host_id):
995 print "creating block %s/%s" % (self.job.id, host_id)
996 row = [0, self.job.id, host_id]
997 block = IneligibleHostQueue(row=row, new_record=True)
998 block.save()
999
1000
1001 def unblock_host(self, host_id):
1002 print "removing block %s/%s" % (self.job.id, host_id)
1003 blocks = list(IneligibleHostQueue.fetch(
1004 'job_id=%d and host_id=%d' % (self.job.id, host_id)))
1005 assert len(blocks) == 1
1006 blocks[0].delete()
1007
1008
mbligh36768f02008-02-22 18:28:33 +00001009 def results_dir(self):
mblighe2586682008-02-29 22:45:46 +00001010 if self.job.is_synchronous() or self.job.num_machines() == 1:
1011 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001012 else:
1013 assert self.host
mblighe2586682008-02-29 22:45:46 +00001014 return os.path.join(self.job.job_dir,
1015 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +00001016
mblighe2586682008-02-29 22:45:46 +00001017
1018 def verify_results_dir(self):
1019 if self.job.is_synchronous() or self.job.num_machines() > 1:
1020 assert self.host
1021 return os.path.join(self.job.job_dir,
1022 self.host.hostname)
1023 else:
1024 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001025
1026
1027 def set_status(self, status):
1028 self.update_field('status', status)
1029 if self.host:
1030 hostname = self.host.hostname
1031 else:
1032 hostname = 'no host'
1033 print "%s/%d status -> %s" % (hostname, self.id, self.status)
1034 if status in ['Queued']:
1035 self.update_field('complete', False)
1036 self.update_field('active', False)
1037
mblighd5c95802008-03-05 00:33:46 +00001038 if status in ['Pending', 'Running', 'Verifying', 'Starting',
1039 'Abort', 'Aborting']:
mbligh36768f02008-02-22 18:28:33 +00001040 self.update_field('complete', False)
1041 self.update_field('active', True)
1042
mblighd5c95802008-03-05 00:33:46 +00001043 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
mbligh36768f02008-02-22 18:28:33 +00001044 self.update_field('complete', True)
1045 self.update_field('active', False)
1046
1047
1048 def run(self,assigned_host=None):
1049 if self.meta_host:
1050 assert assigned_host
mblighe2586682008-02-29 22:45:46 +00001051 # ensure results dir exists for the queue log
mbligh36768f02008-02-22 18:28:33 +00001052 self.job.create_results_dir()
1053 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001054
mbligh36768f02008-02-22 18:28:33 +00001055 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1056 self.meta_host, self.host.hostname, self.status)
1057
1058 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001059
mbligh36768f02008-02-22 18:28:33 +00001060 def requeue(self):
1061 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001062
mbligh36768f02008-02-22 18:28:33 +00001063 if self.meta_host:
1064 self.set_host(None)
1065
1066
mblighe2586682008-02-29 22:45:46 +00001067 def handle_host_failure(self):
1068 """\
1069 Called when this queue entry's host has failed verification and
1070 repair.
1071 """
mblighdffd6372008-02-29 22:47:33 +00001072 assert not self.meta_host
1073 self.set_status('Failed')
1074 if self.job.is_synchronous():
1075 self.job.stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00001076
1077
1078 def clear_results_dir(self, results_dir=None):
1079 results_dir = results_dir or self.results_dir()
1080 if not os.path.exists(results_dir):
1081 return
1082 for filename in os.listdir(results_dir):
1083 if 'queue.log' in filename:
1084 continue
1085 path = os.path.join(results_dir, filename)
1086 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00001087
1088
1089class Job(DBObject):
1090 def __init__(self, id=None, row=None):
1091 assert id or row
mblighe2586682008-02-29 22:45:46 +00001092 DBObject.__init__(self,
1093 ['id','owner','name','priority',
1094 'control_file','control_type','created_on',
1095 'synch_type', 'synch_count','synchronizing'],
1096 id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001097
mblighe2586682008-02-29 22:45:46 +00001098 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
1099 self.owner))
1100
1101
1102 @classmethod
1103 def _get_table(cls):
1104 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001105
1106
1107 def is_server_job(self):
1108 return self.control_type != 2
1109
1110
1111 def get_host_queue_entries(self):
mbligh6f8bab42008-02-29 22:45:14 +00001112 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001113 SELECT * FROM host_queue_entries
1114 WHERE job_id= %s
1115 """, (self.id,))
mbligh6f8bab42008-02-29 22:45:14 +00001116 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001117
1118 assert len(entries)>0
1119
1120 return entries
1121
1122
1123 def set_status(self, status, update_queues=False):
1124 self.update_field('status',status)
1125
1126 if update_queues:
1127 for queue_entry in self.get_host_queue_entries():
1128 queue_entry.set_status(status)
1129
1130
1131 def is_synchronous(self):
1132 return self.synch_type == 2
1133
1134
1135 def is_ready(self):
1136 if not self.is_synchronous():
1137 return True
1138 sql = "job_id=%s AND status='Pending'" % self.id
1139 count = self.count(sql, table='host_queue_entries')
1140 return (count == self.synch_count)
1141
1142
1143 def ready_to_synchronize(self):
1144 # heuristic
1145 queue_entries = self.get_host_queue_entries()
1146 count = 0
1147 for queue_entry in queue_entries:
1148 if queue_entry.status == 'Pending':
1149 count += 1
1150
1151 return (count/self.synch_count >= 0.5)
1152
1153
1154 def start_synchronizing(self):
1155 self.update_field('synchronizing', True)
1156
1157
1158 def results_dir(self):
1159 return self.job_dir
1160
1161 def num_machines(self, clause = None):
1162 sql = "job_id=%s" % self.id
1163 if clause:
1164 sql += " AND (%s)" % clause
1165 return self.count(sql, table='host_queue_entries')
1166
1167
1168 def num_queued(self):
1169 return self.num_machines('not complete')
1170
1171
1172 def num_active(self):
1173 return self.num_machines('active')
1174
1175
1176 def num_complete(self):
1177 return self.num_machines('complete')
1178
1179
1180 def is_finished(self):
1181 left = self.num_queued()
1182 print "%s: %s machines left" % (self.name, left)
1183 return left==0
1184
1185 def stop_synchronizing(self):
1186 self.update_field('synchronizing', False)
1187 self.set_status('Queued', update_queues = False)
1188
1189
mblighe2586682008-02-29 22:45:46 +00001190 def stop_all_entries(self):
1191 for child_entry in self.get_host_queue_entries():
1192 if not child_entry.complete:
1193 child_entry.set_status('Stopped')
1194
1195
1196 def write_to_machines_file(self, queue_entry):
1197 hostname = queue_entry.get_host().hostname
1198 print "writing %s to job %s machines file" % (hostname, self.id)
1199 file_path = os.path.join(self.job_dir, '.machines')
1200 mf = open(file_path, 'a')
1201 mf.write("%s\n" % queue_entry.get_host().hostname)
1202 mf.close()
mbligh36768f02008-02-22 18:28:33 +00001203
1204
1205 def create_results_dir(self, queue_entry=None):
1206 print "create: active: %s complete %s" % (self.num_active(),
1207 self.num_complete())
1208
1209 if not os.path.exists(self.job_dir):
1210 os.makedirs(self.job_dir)
1211
1212 if queue_entry:
1213 return queue_entry.results_dir()
1214 return self.job_dir
1215
1216
1217 def run(self, queue_entry):
1218 results_dir = self.create_results_dir(queue_entry)
1219
1220 if self.is_synchronous():
1221 if not self.is_ready():
mblighd5c95802008-03-05 00:33:46 +00001222 return Agent([VerifySynchronousTask(
1223 queue_entry = queue_entry)],
1224 [queue_entry.id])
mbligh36768f02008-02-22 18:28:33 +00001225
1226 queue_entry.set_status('Starting')
1227
1228 ctrl = open(os.tmpnam(), 'w')
1229 if self.control_file:
1230 ctrl.write(self.control_file)
1231 else:
1232 ctrl.write("")
1233 ctrl.flush()
1234
1235 if self.is_synchronous():
mbligh36768f02008-02-22 18:28:33 +00001236 queue_entries = self.get_host_queue_entries()
1237 else:
1238 assert queue_entry
mbligh36768f02008-02-22 18:28:33 +00001239 queue_entries = [queue_entry]
mblighe2586682008-02-29 22:45:46 +00001240 hostnames = ','.join([entry.get_host().hostname
1241 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00001242
mbligh4314a712008-02-29 22:44:30 +00001243 params = [_autoserv_path, '-n', '-r', results_dir,
mbligh36768f02008-02-22 18:28:33 +00001244 '-b', '-u', self.owner, '-l', self.name,
1245 '-m', hostnames, ctrl.name]
1246
1247 if not self.is_server_job():
1248 params.append('-c')
1249
1250 tasks = []
1251 if not self.is_synchronous():
1252 tasks.append(VerifyTask(queue_entry))
mblighe2586682008-02-29 22:45:46 +00001253
1254 tasks.append(QueueTask(job = self,
1255 queue_entries = queue_entries,
1256 cmd = params))
mbligh36768f02008-02-22 18:28:33 +00001257
mblighd5c95802008-03-05 00:33:46 +00001258 ids = []
1259 for entry in queue_entries:
1260 ids.append(entry.id)
1261
1262 agent = Agent(tasks, ids)
mbligh36768f02008-02-22 18:28:33 +00001263
1264 return agent
1265
1266
1267if __name__ == '__main__':
1268 main()