blob: 5faa76333fa5ab500562dcca475cca7e7b3626a2 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
mblighe2586682008-02-29 22:45:46 +00009import optparse, signal, smtplib, socket, datetime, stat
mblighb090f142008-02-27 21:33:46 +000010from common import global_config
11
mbligh36768f02008-02-22 18:28:33 +000012RESULTS_DIR = '.'
13AUTOSERV_NICE_LEVEL = 10
14
15AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
16
17if os.environ.has_key('AUTOTEST_DIR'):
18 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
19AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
20AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
21
22if AUTOTEST_SERVER_DIR not in sys.path:
23 sys.path.insert(0, AUTOTEST_SERVER_DIR)
24
mbligh6f8bab42008-02-29 22:45:14 +000025_db = None
mbligh36768f02008-02-22 18:28:33 +000026_shutdown = False
27_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000028_autoserv_path = 'autoserv'
29_testing_mode = False
mbligh36768f02008-02-22 18:28:33 +000030
31
32def main():
33 usage = 'usage: %prog [options] results_dir'
34
35 parser = optparse.OptionParser(usage)
36 parser.add_option('--no-recover', help='Skip machine/job recovery ' +
37 'step [for multiple monitors/rolling upgrades]',
38 action='store_true')
39 parser.add_option('--logfile', help='Set a log file that all stdout ' +
40 'should be redirected to. Stderr will go to this ' +
41 'file + ".err"')
42 parser.add_option('--notify', help='Set an email address to be ' +
43 'notified of exceptions')
mbligh4314a712008-02-29 22:44:30 +000044 parser.add_option('--test', help='Indicate that scheduler is under ' +
45 'test and should use dummy autoserv and no parsing',
46 action='store_true')
mbligh36768f02008-02-22 18:28:33 +000047 (options, args) = parser.parse_args()
48 if len(args) != 1:
49 parser.print_usage()
50 return
51
52 global RESULTS_DIR
53 RESULTS_DIR = args[0]
54
55 global _notify_email
56 _notify_email = options.notify
mbligh4314a712008-02-29 22:44:30 +000057
58 if options.test:
59 global _autoserv_path
60 _autoserv_path = 'autoserv_dummy'
61 global _testing_mode
62 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000063
64 init(options.logfile)
65 dispatcher = Dispatcher(do_recover = not options.no_recover)
66
67 try:
68 while not _shutdown:
69 dispatcher.tick()
70 time.sleep(20)
71 dispatcher.shut_down()
72 except:
73 log_stacktrace("Uncaught exception; terminating monitor_db")
74
mbligh6f8bab42008-02-29 22:45:14 +000075 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +000076
77
78def handle_sigint(signum, frame):
79 global _shutdown
80 _shutdown = True
81 print "Shutdown request received."
82
83
84def init(logfile):
85 if logfile:
86 enable_logging(logfile)
87 print "%s> dispatcher starting" % time.strftime("%X %x")
88 print "My PID is %d" % os.getpid()
89
90 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
mbligh6f8bab42008-02-29 22:45:14 +000091 global _db
92 _db = DatabaseConn()
mbligh36768f02008-02-22 18:28:33 +000093
94 print "Setting signal handler"
95 signal.signal(signal.SIGINT, handle_sigint)
96
97 print "Connected! Running..."
98
99
100def enable_logging(logfile):
101 out_file = logfile
102 err_file = "%s.err" % logfile
103 print "Enabling logging to %s (%s)" % (out_file, err_file)
104 out_fd = open(out_file, "a", buffering=0)
105 err_fd = open(err_file, "a", buffering=0)
106
107 os.dup2(out_fd.fileno(), sys.stdout.fileno())
108 os.dup2(err_fd.fileno(), sys.stderr.fileno())
109
110 sys.stdout = out_fd
111 sys.stderr = err_fd
112
113
114def idle_hosts():
mbligh6f8bab42008-02-29 22:45:14 +0000115 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000116 SELECT * FROM hosts h WHERE
117 id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
118 (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
119 OR
120 (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
121 INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
122 )
123 AND locked=false AND (h.status IS null OR h.status='Ready') """)
mbligh6f8bab42008-02-29 22:45:14 +0000124 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000125 return hosts
126
mblighd5c95802008-03-05 00:33:46 +0000127def queue_entries_to_abort():
128 rows = _db.execute("""
129 SELECT * FROM host_queue_entries WHERE status='Abort';
130 """)
131 qe = [HostQueueEntry(row=i) for i in rows]
132 return qe
mbligh36768f02008-02-22 18:28:33 +0000133
mblighe2586682008-02-29 22:45:46 +0000134def remove_file_or_dir(path):
135 if stat.S_ISDIR(os.stat(path).st_mode):
136 # directory
137 shutil.rmtree(path)
138 else:
139 # file
140 os.remove(path)
141
142
mbligh6f8bab42008-02-29 22:45:14 +0000143class DatabaseConn:
144 def __init__(self):
145 self.reconnect_wait = 20
146 self.conn = None
147 self.cur = None
mbligh36768f02008-02-22 18:28:33 +0000148
mbligh6f8bab42008-02-29 22:45:14 +0000149 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000150
151
mbligh6f8bab42008-02-29 22:45:14 +0000152 def connect(self):
153 self.disconnect()
154
155 # get global config and parse for info
156 c = global_config.global_config
157 dbase = "AUTOTEST_WEB"
158 DB_HOST = c.get_config_value(dbase, "host", "localhost")
159 DB_SCHEMA = c.get_config_value(dbase, "database",
160 "autotest_web")
161
162 global _testing_mode
163 if _testing_mode:
164 DB_SCHEMA = 'stresstest_autotest_web'
165
166 DB_USER = c.get_config_value(dbase, "user", "autotest")
167 DB_PASS = c.get_config_value(dbase, "password", "google")
168
169 while not self.conn:
170 try:
171 self.conn = MySQLdb.connect(host=DB_HOST,
172 user=DB_USER,
173 passwd=DB_PASS,
174 db=DB_SCHEMA)
175
176 self.conn.autocommit(True)
177 self.cur = self.conn.cursor()
178 except MySQLdb.OperationalError:
179 #traceback.print_exc()
180 print "Can't connect to MYSQL; reconnecting"
181 time.sleep(self.reconnect_wait)
182 self.disconnect()
183
184
185 def disconnect(self):
186 if self.conn:
187 self.conn.close()
188 self.conn = None
189 self.cur = None
190
191
192 def execute(self, *args, **dargs):
193 while (True):
194 try:
195 self.cur.execute(*args, **dargs)
196 return self.cur.fetchall()
197 except MySQLdb.OperationalError:
198 print "MYSQL connection died; reconnecting"
199 time.sleep(self.reconnect_wait)
200 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000201
202
203def parse_results(results_dir, flags=""):
mbligh4314a712008-02-29 22:44:30 +0000204 if _testing_mode:
205 return
mbligh36768f02008-02-22 18:28:33 +0000206 parse = os.path.join(AUTOTEST_TKO_DIR, 'parse')
207 output = os.path.join(results_dir, '.parse.log')
208 os.system("%s %s -r -o %s > %s 2>&1 &" % (parse, flags, results_dir, output))
209
210
211def log_stacktrace(reason):
212 (type, value, tb) = sys.exc_info()
213 str = "EXCEPTION: %s\n" % reason
214 str += "%s / %s / %s\n" % (socket.gethostname(), os.getpid(),
215 time.strftime("%X %x"))
216 str += ''.join(traceback.format_exception(type, value, tb))
217
218 sys.stderr.write("\n%s\n" % str)
219
220 if _notify_email:
221 sender = "monitor_db"
222 subject = "monitor_db exception"
223 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
224 sender, _notify_email, subject, str)
225 mailer = smtplib.SMTP('localhost')
226 mailer.sendmail(sender, _notify_email, msg)
227 mailer.quit()
228
229
230class Dispatcher:
231 def __init__(self, do_recover=True):
232 self._agents = []
233 self.shutting_down = False
234
235 if do_recover:
236 self._recover_lost()
237
238
239 def shut_down(self):
240 print "Shutting down!"
241 self.shutting_down = True
242 while self._agents:
243 self.tick()
244 time.sleep(40)
245
246
247 def tick(self):
248 if not self.shutting_down:
mblighd5c95802008-03-05 00:33:46 +0000249 self._find_aborting()
mbligh36768f02008-02-22 18:28:33 +0000250 self._find_more_work()
251 self._handle_agents()
252
253
254 def add_agent(self, agent):
255 self._agents.append(agent)
256 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000257
258 # Find agent corresponding to the specified queue_entry
259 def get_agents(self, queue_entry):
260 res_agents = []
261 for agent in self._agents:
262 if queue_entry.id in agent.queue_entry_ids:
263 res_agents.append(agent)
264 return res_agents
265
266
267 def remove_agent(self, agent):
268 self._agents.remove(agent)
mbligh36768f02008-02-22 18:28:33 +0000269
270
271 def _recover_lost(self):
mblighd5c95802008-03-05 00:33:46 +0000272 rows = _db.execute("""SELECT * FROM host_queue_entries WHERE active AND NOT complete AND status != 'Abort' AND status != 'Aborting'""")
mbligh6f8bab42008-02-29 22:45:14 +0000273 if len(rows) > 0:
274 queue_entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000275 for queue_entry in queue_entries:
276 job = queue_entry.job
277 if job.is_synchronous():
278 for child_entry in job.get_host_queue_entries():
279 child_entry.requeue()
280 else:
281 queue_entry.requeue()
282 queue_entry.clear_results_dir()
283
mblighd5c95802008-03-05 00:33:46 +0000284 rebooting_host_ids = []
285 rows = _db.execute("""SELECT * FROM host_queue_entries
286 WHERE status='Abort' or status='Aborting'""")
287 if len(rows) > 0:
288 queue_entries = [HostQueueEntry(row=i) for i in rows]
289 for queue_entry in queue_entries:
290 queue_host = queue_entry.get_host()
291 reboot_task = RebootTask(queue_host)
292 verify_task = VerifyTask(host = queue_host)
293 self.add_agent(Agent(tasks=[reboot_task,
294 verify_task],
295 queue_entry_ids=[queue_entry.id]))
296 queue_entry.set_status('Aborted')
297 # Secure the host from being picked up
298 queue_host.set_status('Rebooting')
299 rebooting_host_ids.append(queue_host.id)
300
mbligh6f8bab42008-02-29 22:45:14 +0000301 rows = _db.execute("""SELECT * FROM hosts
mbligh36768f02008-02-22 18:28:33 +0000302 WHERE status != 'Ready' AND NOT locked""")
mbligh6f8bab42008-02-29 22:45:14 +0000303 if len(rows) > 0:
304 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000305 for host in hosts:
mblighd5c95802008-03-05 00:33:46 +0000306 if host.id in rebooting_host_ids:
307 continue
mbligh36768f02008-02-22 18:28:33 +0000308 verify_task = VerifyTask(host = host)
309 self.add_agent(Agent(tasks = [verify_task]))
310
311
312 def _find_more_work(self):
313 print "finding work"
314
315 num_started = 0
316 for host in idle_hosts():
317 tasks = host.next_queue_entries()
318 if tasks:
319 for next in tasks:
320 try:
321 agent = next.run(assigned_host=host)
322 if agent:
323 self.add_agent(agent)
324
325 num_started += 1
326 if num_started>=100:
327 return
328 break
329 except:
330 next.set_status('Failed')
331
332# if next.host:
333# next.host.set_status('Ready')
334
335 log_stacktrace("task_id = %d" % next.id)
336
337
mblighd5c95802008-03-05 00:33:46 +0000338 def _find_aborting(self):
339 num_aborted = 0
340 # Find jobs that are aborting
341 for entry in queue_entries_to_abort():
342 agents_to_abort = self.get_agents(entry)
343 entry_host = entry.get_host()
344 reboot_task = RebootTask(entry_host)
345 verify_task = VerifyTask(host = entry_host)
346 tasks = [reboot_task, verify_task]
347 if agents_to_abort:
348 abort_task = AbortTask(entry, agents_to_abort)
349 tasks.insert(0, abort_task)
350 else:
351 entry.set_status('Aborted')
352 # just to make sure this host does not get
353 # taken away
354 entry_host.set_status('Rebooting')
355 self.add_agent(Agent(tasks=tasks,
356 queue_entry_ids = [entry.id]))
357 num_aborted += 1
358 if num_aborted >= 50:
359 break
360
361
mbligh36768f02008-02-22 18:28:33 +0000362 def _handle_agents(self):
363 still_running = []
364 for agent in self._agents:
365 agent.tick()
366 if not agent.is_done():
367 still_running.append(agent)
368 else:
369 print "agent finished"
370 self._agents = still_running
371
372
373class RunMonitor(object):
374 def __init__(self, cmd, nice_level = None, log_file = None):
375 self.nice_level = nice_level
376 self.log_file = log_file
377 self.proc = self.run(cmd)
378
379 def run(self, cmd):
380 if self.nice_level:
381 nice_cmd = ['nice','-n', str(self.nice_level)]
382 nice_cmd.extend(cmd)
383 cmd = nice_cmd
384
385 out_file = None
386 if self.log_file:
387 try:
388 out_file = open(self.log_file, 'a')
389 out_file.write("\n%s\n" % ('*'*80))
390 out_file.write("%s> %s\n" % (time.strftime("%X %x"), cmd))
391 out_file.write("%s\n" % ('*'*80))
392 except:
393 pass
394
395 if not out_file:
396 out_file = open('/dev/null', 'w')
397
398 in_devnull = open('/dev/null', 'r')
399 print "cmd = %s" % cmd
400 print "path = %s" % os.getcwd()
401
402 proc = subprocess.Popen(cmd, stdout=out_file,
403 stderr=subprocess.STDOUT, stdin=in_devnull)
404 out_file.close()
405 in_devnull.close()
406 return proc
407
408
409 def kill(self):
mblighd5c95802008-03-05 00:33:46 +0000410 for i in range(0, 4):
411 if self.proc.poll() == None:
412 os.kill(self.proc.pid, signal.SIGTERM)
413 time.sleep(5)
414 # Check that the process was terminated
415 if self.proc.poll() != None:
416 return
417
418 print ("""Error: process %d has not terminated""" %
419 self.proc.pid)
420
mbligh36768f02008-02-22 18:28:33 +0000421
422 def exit_code(self):
423 return self.proc.poll()
424
425
426class Agent(object):
mblighd5c95802008-03-05 00:33:46 +0000427 def __init__(self, tasks, queue_entry_ids=[]):
mbligh36768f02008-02-22 18:28:33 +0000428 self.active_task = None
429 self.queue = Queue.Queue(0)
430 self.dispatcher = None
mblighd5c95802008-03-05 00:33:46 +0000431 self.queue_entry_ids = queue_entry_ids
mbligh36768f02008-02-22 18:28:33 +0000432
433 for task in tasks:
434 self.add_task(task)
435
436
437 def add_task(self, task):
438 self.queue.put_nowait(task)
439 task.agent = self
440
441
442 def tick(self):
443 print "agent tick"
444 if self.active_task and not self.active_task.is_done():
445 self.active_task.poll()
446 else:
447 self._next_task();
448
449
450 def _next_task(self):
451 print "agent picking task"
452 if self.active_task:
453 assert self.active_task.is_done()
454
mblighe2586682008-02-29 22:45:46 +0000455 if not self.active_task.success:
456 self.on_task_failure()
457
mbligh36768f02008-02-22 18:28:33 +0000458 self.active_task = None
459 if not self.is_done():
460 self.active_task = self.queue.get_nowait()
461 if self.active_task:
462 self.active_task.start()
463
464
mblighe2586682008-02-29 22:45:46 +0000465 def on_task_failure(self):
466 old_queue = self.queue
467 self.queue = Queue.Queue(0)
468 for task in self.active_task.failure_tasks:
469 self.add_task(task)
470 if not self.active_task.clear_queue_on_failure:
471 while not old_queue.empty():
472 self.add_task(old_queue.get_nowait())
473
mbligh36768f02008-02-22 18:28:33 +0000474 def is_done(self):
475 return self.active_task == None and self.queue.empty()
476
477
478 def start(self):
479 assert self.dispatcher
480
481 self._next_task()
482
mblighd5c95802008-03-05 00:33:46 +0000483
mbligh36768f02008-02-22 18:28:33 +0000484class AgentTask(object):
mblighe2586682008-02-29 22:45:46 +0000485 def __init__(self, cmd, failure_tasks = [],
486 clear_queue_on_failure=True):
487 """\
488 By default, on failure, the Agent's task queue is cleared and
489 replaced with the tasks in failure_tasks. If
490 clear_queue_on_failure=False, the task queue will not be
491 cleared, and the tasks in failure_tasks will be inserted at the
492 beginning of the queue.
493 """
mbligh36768f02008-02-22 18:28:33 +0000494 self.done = False
495 self.failure_tasks = failure_tasks
mblighe2586682008-02-29 22:45:46 +0000496 self.clear_queue_on_failure = clear_queue_on_failure
mbligh36768f02008-02-22 18:28:33 +0000497 self.started = False
498 self.cmd = cmd
mblighd5c95802008-03-05 00:33:46 +0000499 self.task = None
mbligh36768f02008-02-22 18:28:33 +0000500 self.agent = None
mblighd5c95802008-03-05 00:33:46 +0000501 self.monitor = None
mbligh36768f02008-02-22 18:28:33 +0000502
503
504 def poll(self):
505 print "poll"
mblighd5c95802008-03-05 00:33:46 +0000506 if self.monitor:
mbligh36768f02008-02-22 18:28:33 +0000507 self.tick(self.monitor.exit_code())
508 else:
509 self.finished(False)
510
511
512 def tick(self, exit_code):
513 if exit_code==None:
514 return
515# print "exit_code was %d" % exit_code
516 if exit_code == 0:
517 success = True
518 else:
519 success = False
520
521 self.finished(success)
522
523
524 def is_done(self):
525 return self.done
526
527
528 def finished(self, success):
529 self.done = True
530 self.success = success
531 self.epilog()
532
533
534 def prolog(self):
535 pass
536
537
538 def epilog(self):
539 pass
540
541
542 def start(self):
543 assert self.agent
544
545 if not self.started:
546 self.prolog()
547 self.run()
548
549 self.started = True
550
551
552 def abort(self):
mblighd5c95802008-03-05 00:33:46 +0000553 if self.monitor:
554 self.monitor.kill()
555 self.done = True
mbligh36768f02008-02-22 18:28:33 +0000556
557
558 def run(self):
559 if self.cmd:
560 print "agent starting monitor"
561
562 log_file = None
563 if hasattr(self, 'host'):
564 log_file = os.path.join(os.path.join(RESULTS_DIR, 'hosts'), self.host.hostname)
565
566 self.monitor = RunMonitor(self.cmd, nice_level = AUTOSERV_NICE_LEVEL, log_file = log_file)
567
568
569class RepairTask(AgentTask):
570 def __init__(self, host):
mbligh48c10a52008-02-29 22:46:38 +0000571 cmd = [_autoserv_path , '-R', '-m', host.hostname]
mbligh36768f02008-02-22 18:28:33 +0000572 self.host = host
mblighe2586682008-02-29 22:45:46 +0000573 AgentTask.__init__(self, cmd, clear_queue_on_failure=False)
574
mbligh36768f02008-02-22 18:28:33 +0000575
576 def prolog(self):
577 print "repair_task starting"
578 self.host.set_status('Repairing')
579
580
581 def epilog(self):
582 if self.success:
mblighe2586682008-02-29 22:45:46 +0000583 status = 'Repair Succeeded'
mbligh36768f02008-02-22 18:28:33 +0000584 else:
585 status = 'Repair Failed'
586
587 self.host.set_status(status)
588
589
590class VerifyTask(AgentTask):
591 def __init__(self, queue_entry=None, host=None):
592 assert bool(queue_entry) != bool(host)
593
594 self.host = host or queue_entry.host
595 self.queue_entry = queue_entry
596
597 self.temp_results_dir = tempfile.mkdtemp(suffix='.verify')
mbligh48c10a52008-02-29 22:46:38 +0000598 cmd = [_autoserv_path,'-v','-m',self.host.hostname,
mbligh36768f02008-02-22 18:28:33 +0000599 '-r', self.temp_results_dir]
600
mblighe2586682008-02-29 22:45:46 +0000601 failure_tasks = self.get_failure_tasks()
602
mblighdffd6372008-02-29 22:47:33 +0000603 AgentTask.__init__(self, cmd, failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +0000604
605
606 def get_failure_tasks(self):
607 'To be overridden'
608 return [RepairTask(self.host),
609 ReverifyTask(self.queue_entry, self.host)]
mbligh36768f02008-02-22 18:28:33 +0000610
611
612 def prolog(self):
613 print "starting verify on %s" % (self.host.hostname)
614 if self.queue_entry:
615 self.queue_entry.set_status('Verifying')
mblighdffd6372008-02-29 22:47:33 +0000616 self.queue_entry.clear_results_dir(
617 self.queue_entry.verify_results_dir())
mbligh36768f02008-02-22 18:28:33 +0000618 self.host.set_status('Verifying')
619
620
621 def epilog(self):
622 if self.queue_entry and (self.success or
623 not self.queue_entry.meta_host):
624 self.move_results()
mblighe2586682008-02-29 22:45:46 +0000625 shutil.rmtree(self.temp_results_dir)
mbligh36768f02008-02-22 18:28:33 +0000626
627 if self.success:
mblighe2586682008-02-29 22:45:46 +0000628 self.on_success()
mbligh36768f02008-02-22 18:28:33 +0000629 else:
mblighe2586682008-02-29 22:45:46 +0000630 self.on_failure()
mbligh36768f02008-02-22 18:28:33 +0000631
mblighe2586682008-02-29 22:45:46 +0000632
633 def on_success(self):
634 self.host.set_status('Ready')
635
636
637 def on_failure(self):
638 self.host.set_status('Failed Verify')
mblighe2586682008-02-29 22:45:46 +0000639 if self.queue_entry:
mblighdffd6372008-02-29 22:47:33 +0000640 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +0000641
642
643 def move_results(self):
644 assert self.queue_entry is not None
mblighe2586682008-02-29 22:45:46 +0000645 target_dir = self.queue_entry.verify_results_dir()
mbligh36768f02008-02-22 18:28:33 +0000646 if not os.path.exists(target_dir):
647 os.makedirs(target_dir)
648 files = os.listdir(self.temp_results_dir)
649 for filename in files:
mblighe2586682008-02-29 22:45:46 +0000650 self.force_move(os.path.join(self.temp_results_dir,
651 filename),
652 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +0000653
654
mblighe2586682008-02-29 22:45:46 +0000655 @staticmethod
656 def force_move(source, dest):
657 """\
658 Replacement for shutil.move() that will delete the destination
659 if it exists, even if it's a directory.
660 """
661 if os.path.exists(dest):
662 print ('Warning: removing existing destination file ' +
663 dest)
664 remove_file_or_dir(dest)
665 shutil.move(source, dest)
666
667
668class ReverifyTask(VerifyTask):
669 def __init__(self, queue_entry=None, host=None):
mblighdffd6372008-02-29 22:47:33 +0000670 self.fail_queue_entry = None
mblighe2586682008-02-29 22:45:46 +0000671 if queue_entry:
mblighdffd6372008-02-29 22:47:33 +0000672 if not host:
673 host = queue_entry.host
674 if not queue_entry.meta_host:
675 self.fail_queue_entry = queue_entry
676 # always construct VerifyTask without the queue_entry - we don't
677 # want to touch the queue entry unless we fail, in which case we
678 # just fail it (and only if it's a non-metahost)
679 VerifyTask.__init__(self, host=host)
mblighe2586682008-02-29 22:45:46 +0000680
681
682 def get_failure_tasks(self):
683 return []
684
685
mblighe2586682008-02-29 22:45:46 +0000686 def on_failure(self):
687 self.host.set_status('Repair Failed')
mblighdffd6372008-02-29 22:47:33 +0000688 if self.fail_queue_entry:
689 self.fail_queue_entry.handle_host_failure()
mblighe2586682008-02-29 22:45:46 +0000690
691
mblighdffd6372008-02-29 22:47:33 +0000692class VerifySynchronousTask(VerifyTask):
693 def __init__(self, queue_entry):
694 VerifyTask.__init__(self, queue_entry = queue_entry)
695
696
697 def on_success(self):
698 VerifyTask.on_success(self)
699 self.on_pending()
700
701
mblighe2586682008-02-29 22:45:46 +0000702 def on_pending(self):
703 if self.queue_entry.job.num_complete() > 0:
704 # some other entry failed verify, and we've
705 # already been marked as stopped
706 return
707
708 self.queue_entry.set_status('Pending')
709 job = self.queue_entry.job
710 if job.is_ready():
711 agent = job.run(self.queue_entry)
712 self.agent.dispatcher.add_agent(agent)
713
714
mbligh36768f02008-02-22 18:28:33 +0000715class QueueTask(AgentTask):
716 def __init__(self, job, queue_entries, cmd):
717 AgentTask.__init__(self, cmd)
718 self.job = job
719 self.queue_entries = queue_entries
720
721
mbligh4314a712008-02-29 22:44:30 +0000722 @staticmethod
723 def _write_keyval(queue_entry, field, value):
mbligh36768f02008-02-22 18:28:33 +0000724 key_path = os.path.join(queue_entry.results_dir(), 'keyval')
725 keyval_file = open(key_path, 'a')
726 print >> keyval_file, '%s=%d' % (field, value)
727 keyval_file.close()
728
729
730 def prolog(self):
mblighe2586682008-02-29 22:45:46 +0000731 # write some job timestamps into the job keyval file
732 queued = time.mktime(self.job.created_on.timetuple())
733 started = time.time()
734 self._write_keyval(self.queue_entries[0], "job_queued", queued)
735 self._write_keyval(self.queue_entries[0], "job_started",
736 started)
mbligh36768f02008-02-22 18:28:33 +0000737 for queue_entry in self.queue_entries:
738 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
739 queue_entry.set_status('Running')
740 queue_entry.host.set_status('Running')
mblighe2586682008-02-29 22:45:46 +0000741 if (not self.job.is_synchronous() and
742 self.job.num_machines() > 1):
743 assert len(self.queue_entries) == 1
744 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +0000745
746
747 def epilog(self):
748 if self.success:
749 status = 'Completed'
750 else:
751 status = 'Failed'
752
mblighe2586682008-02-29 22:45:46 +0000753 # write another timestamp into the job keyval file
754 finished = time.time()
755 self._write_keyval(self.queue_entries[0], "job_finished",
756 finished)
mbligh36768f02008-02-22 18:28:33 +0000757 for queue_entry in self.queue_entries:
758 queue_entry.set_status(status)
759 queue_entry.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +0000760
761 if self.job.is_synchronous() or self.job.num_machines()==1:
762 if self.job.is_finished():
763 parse_results(self.job.results_dir())
764 else:
765 for queue_entry in self.queue_entries:
766 parse_results(queue_entry.results_dir(), flags='-l 2')
767
768 print "queue_task finished with %s/%s" % (status, self.success)
769
770
771class RebootTask(AgentTask):
mblighd5c95802008-03-05 00:33:46 +0000772 def __init__(self, host):
773 global _autoserv_path
774
775 # Current implementation of autoserv requires control file
776 # to be passed on reboot action request. TODO: remove when no
777 # longer appropriate.
778 self.cmd = [_autoserv_path, '-b', '-m', host.hostname,
779 '/dev/null']
mbligh36768f02008-02-22 18:28:33 +0000780 self.host = host
mblighd5c95802008-03-05 00:33:46 +0000781 AgentTask.__init__(self, self.cmd,
782 clear_queue_on_failure = False)
783
784
785 def prolog(self):
786 print "starting reboot task for host: %s" % self.host.hostname
787 self.host.set_status("Rebooting")
788
789 def epilog(self):
790 if self.success:
791 self.host.set_status("Reboot Succeeded")
792 else:
793 self.host.set_status("Reboot Failed")
794
795
796class AbortTask(AgentTask):
797 def __init__(self, queue_entry, agents_to_abort):
798 self.queue_entry = queue_entry
799 self.agents_to_abort = agents_to_abort
800 for agent in agents_to_abort:
801 agent.dispatcher.remove_agent(agent)
802 AgentTask.__init__(self, '')
mbligh36768f02008-02-22 18:28:33 +0000803
804
mblighd5c95802008-03-05 00:33:46 +0000805 def prolog(self):
806 print "starting abort on host %s, job %s" % (
807 self.queue_entry.host_id, self.queue_entry.job_id)
808 self.queue_entry.set_status('Aborting')
809
mbligh36768f02008-02-22 18:28:33 +0000810
mblighd5c95802008-03-05 00:33:46 +0000811 def epilog(self):
812 self.queue_entry.set_status('Aborted')
813 self.success = True
mbligh36768f02008-02-22 18:28:33 +0000814
815 def run(self):
mblighd5c95802008-03-05 00:33:46 +0000816 for agent in self.agents_to_abort:
817 if (agent.active_task):
818 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +0000819
820
821class DBObject(object):
mblighe2586682008-02-29 22:45:46 +0000822 def __init__(self, fields, id=None, row=None, new_record=False):
823 assert (bool(id) != bool(row)) and fields
mbligh36768f02008-02-22 18:28:33 +0000824
mblighe2586682008-02-29 22:45:46 +0000825 self.__table = self._get_table()
mbligh36768f02008-02-22 18:28:33 +0000826 self.__fields = fields
827
828 self.__new_record = new_record
829
830 if row is None:
831 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
mbligh6f8bab42008-02-29 22:45:14 +0000832 rows = _db.execute(sql, (id,))
833 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +0000834 raise "row not found (table=%s, id=%s)" % \
835 (self.__table, id)
mbligh6f8bab42008-02-29 22:45:14 +0000836 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +0000837
mblighe2586682008-02-29 22:45:46 +0000838 assert len(row)==len(fields), (
839 "table = %s, row = %s/%d, fields = %s/%d" % (
840 self.__table, row, len(row), fields, len(fields)))
mbligh36768f02008-02-22 18:28:33 +0000841
842 self.__valid_fields = {}
843 for i,value in enumerate(row):
844 self.__dict__[fields[i]] = value
845 self.__valid_fields[fields[i]] = True
846
847 del self.__valid_fields['id']
848
mblighe2586682008-02-29 22:45:46 +0000849
850 @classmethod
851 def _get_table(cls):
852 raise NotImplementedError('Subclasses must override this')
853
854
mbligh36768f02008-02-22 18:28:33 +0000855 def count(self, where, table = None):
856 if not table:
857 table = self.__table
mbligh4314a712008-02-29 22:44:30 +0000858
mbligh6f8bab42008-02-29 22:45:14 +0000859 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000860 SELECT count(*) FROM %s
861 WHERE %s
862 """ % (table, where))
mbligh36768f02008-02-22 18:28:33 +0000863
mbligh6f8bab42008-02-29 22:45:14 +0000864 assert len(rows) == 1
865
866 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +0000867
868
869 def num_cols(self):
870 return len(self.__fields)
871
872
873 def update_field(self, field, value):
874 assert self.__valid_fields[field]
875
876 if self.__dict__[field] == value:
877 return
878
879 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
880 (self.__table, field)
mbligh6f8bab42008-02-29 22:45:14 +0000881 _db.execute(query, (value, self.id))
mbligh36768f02008-02-22 18:28:33 +0000882
883 self.__dict__[field] = value
884
885
886 def save(self):
887 if self.__new_record:
888 keys = self.__fields[1:] # avoid id
889 columns = ','.join([str(key) for key in keys])
890 values = ['"%s"' % self.__dict__[key] for key in keys]
891 values = ','.join(values)
892 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
893 (self.__table, columns, values)
mbligh6f8bab42008-02-29 22:45:14 +0000894 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +0000895
896
mblighe2586682008-02-29 22:45:46 +0000897 def delete(self):
898 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
899 _db.execute(query, (self.id,))
900
901
902 @classmethod
903 def fetch(cls, where):
904 rows = _db.execute(
905 'SELECT * FROM %s WHERE %s' % (cls._get_table(), where))
906 for row in rows:
907 yield cls(row=row)
908
mbligh36768f02008-02-22 18:28:33 +0000909
910class IneligibleHostQueue(DBObject):
911 def __init__(self, id=None, row=None, new_record=None):
912 fields = ['id', 'job_id', 'host_id']
mblighe2586682008-02-29 22:45:46 +0000913 DBObject.__init__(self, fields, id=id, row=row,
914 new_record=new_record)
915
916
917 @classmethod
918 def _get_table(cls):
919 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +0000920
921
922class Host(DBObject):
923 def __init__(self, id=None, row=None):
924 fields = ['id', 'hostname', 'locked', 'synch_id','status']
mblighe2586682008-02-29 22:45:46 +0000925 DBObject.__init__(self, fields, id=id, row=row)
926
927
928 @classmethod
929 def _get_table(cls):
930 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +0000931
932
933 def current_task(self):
mbligh6f8bab42008-02-29 22:45:14 +0000934 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000935 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
936 """, (self.id,))
937
mbligh6f8bab42008-02-29 22:45:14 +0000938 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +0000939 return None
940 else:
mbligh6f8bab42008-02-29 22:45:14 +0000941 assert len(rows) == 1
942 results = rows[0];
mbligh36768f02008-02-22 18:28:33 +0000943# print "current = %s" % results
944 return HostQueueEntry(row=results)
945
946
947 def next_queue_entries(self):
948 if self.locked:
949 print "%s locked, not queuing" % self.hostname
950 return None
951# print "%s/%s looking for work" % (self.hostname, self.platform_id)
mbligh6f8bab42008-02-29 22:45:14 +0000952 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000953 SELECT * FROM host_queue_entries
954 WHERE ((host_id=%s) OR (meta_host IS NOT null AND
955 (meta_host IN (
956 SELECT label_id FROM hosts_labels WHERE host_id=%s
957 )
958 )
959 AND job_id NOT IN (
960 SELECT job_id FROM ineligible_host_queues
961 WHERE host_id=%s
962 )))
963 AND NOT complete AND NOT active
964 ORDER BY priority DESC, meta_host, id
965 LIMIT 1
966 """, (self.id,self.id, self.id))
967
mbligh6f8bab42008-02-29 22:45:14 +0000968 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +0000969 return None
970 else:
mbligh6f8bab42008-02-29 22:45:14 +0000971 return [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000972
973 def yield_work(self):
974 print "%s yielding work" % self.hostname
975 if self.current_task():
976 self.current_task().requeue()
977
978 def set_status(self,status):
979 self.update_field('status',status)
980
981
982class HostQueueEntry(DBObject):
983 def __init__(self, id=None, row=None):
984 assert id or row
985 fields = ['id', 'job_id', 'host_id', 'priority', 'status',
986 'meta_host', 'active', 'complete']
mblighe2586682008-02-29 22:45:46 +0000987 DBObject.__init__(self, fields, id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +0000988
989 self.job = Job(self.job_id)
990
991 if self.host_id:
992 self.host = Host(self.host_id)
993 else:
994 self.host = None
995
996 self.queue_log_path = os.path.join(self.job.results_dir(),
997 'queue.log.' + str(self.id))
998
999
mblighe2586682008-02-29 22:45:46 +00001000 @classmethod
1001 def _get_table(cls):
1002 return 'host_queue_entries'
1003
1004
mbligh36768f02008-02-22 18:28:33 +00001005 def set_host(self, host):
1006 if host:
1007 self.queue_log_record('Assigning host ' + host.hostname)
1008 self.update_field('host_id', host.id)
1009 self.update_field('active', True)
mblighe2586682008-02-29 22:45:46 +00001010 self.block_host(host.id)
mbligh36768f02008-02-22 18:28:33 +00001011 else:
1012 self.queue_log_record('Releasing host')
mblighe2586682008-02-29 22:45:46 +00001013 self.unblock_host(self.host.id)
mbligh36768f02008-02-22 18:28:33 +00001014 self.update_field('host_id', None)
1015
1016 self.host = host
1017
1018
1019 def get_host(self):
mblighe2586682008-02-29 22:45:46 +00001020 return self.host
mbligh36768f02008-02-22 18:28:33 +00001021
1022
1023 def queue_log_record(self, log_line):
mblighe2586682008-02-29 22:45:46 +00001024 now = str(datetime.datetime.now())
mbligh36768f02008-02-22 18:28:33 +00001025 queue_log = open(self.queue_log_path, 'a', 0)
mblighe2586682008-02-29 22:45:46 +00001026 queue_log.write(now + ' ' + log_line + '\n')
mbligh36768f02008-02-22 18:28:33 +00001027 queue_log.close()
1028
1029
mblighe2586682008-02-29 22:45:46 +00001030 def block_host(self, host_id):
1031 print "creating block %s/%s" % (self.job.id, host_id)
1032 row = [0, self.job.id, host_id]
1033 block = IneligibleHostQueue(row=row, new_record=True)
1034 block.save()
1035
1036
1037 def unblock_host(self, host_id):
1038 print "removing block %s/%s" % (self.job.id, host_id)
1039 blocks = list(IneligibleHostQueue.fetch(
1040 'job_id=%d and host_id=%d' % (self.job.id, host_id)))
1041 assert len(blocks) == 1
1042 blocks[0].delete()
1043
1044
mbligh36768f02008-02-22 18:28:33 +00001045 def results_dir(self):
mblighe2586682008-02-29 22:45:46 +00001046 if self.job.is_synchronous() or self.job.num_machines() == 1:
1047 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001048 else:
1049 assert self.host
mblighe2586682008-02-29 22:45:46 +00001050 return os.path.join(self.job.job_dir,
1051 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +00001052
mblighe2586682008-02-29 22:45:46 +00001053
1054 def verify_results_dir(self):
1055 if self.job.is_synchronous() or self.job.num_machines() > 1:
1056 assert self.host
1057 return os.path.join(self.job.job_dir,
1058 self.host.hostname)
1059 else:
1060 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001061
1062
1063 def set_status(self, status):
1064 self.update_field('status', status)
1065 if self.host:
1066 hostname = self.host.hostname
1067 else:
1068 hostname = 'no host'
1069 print "%s/%d status -> %s" % (hostname, self.id, self.status)
1070 if status in ['Queued']:
1071 self.update_field('complete', False)
1072 self.update_field('active', False)
1073
mblighd5c95802008-03-05 00:33:46 +00001074 if status in ['Pending', 'Running', 'Verifying', 'Starting',
1075 'Abort', 'Aborting']:
mbligh36768f02008-02-22 18:28:33 +00001076 self.update_field('complete', False)
1077 self.update_field('active', True)
1078
mblighd5c95802008-03-05 00:33:46 +00001079 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
mbligh36768f02008-02-22 18:28:33 +00001080 self.update_field('complete', True)
1081 self.update_field('active', False)
1082
1083
1084 def run(self,assigned_host=None):
1085 if self.meta_host:
1086 assert assigned_host
mblighe2586682008-02-29 22:45:46 +00001087 # ensure results dir exists for the queue log
mbligh36768f02008-02-22 18:28:33 +00001088 self.job.create_results_dir()
1089 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001090
mbligh36768f02008-02-22 18:28:33 +00001091 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1092 self.meta_host, self.host.hostname, self.status)
1093
1094 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001095
mbligh36768f02008-02-22 18:28:33 +00001096 def requeue(self):
1097 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001098
mbligh36768f02008-02-22 18:28:33 +00001099 if self.meta_host:
1100 self.set_host(None)
1101
1102
mblighe2586682008-02-29 22:45:46 +00001103 def handle_host_failure(self):
1104 """\
1105 Called when this queue entry's host has failed verification and
1106 repair.
1107 """
mblighdffd6372008-02-29 22:47:33 +00001108 assert not self.meta_host
1109 self.set_status('Failed')
1110 if self.job.is_synchronous():
1111 self.job.stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00001112
1113
1114 def clear_results_dir(self, results_dir=None):
1115 results_dir = results_dir or self.results_dir()
1116 if not os.path.exists(results_dir):
1117 return
1118 for filename in os.listdir(results_dir):
1119 if 'queue.log' in filename:
1120 continue
1121 path = os.path.join(results_dir, filename)
1122 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00001123
1124
1125class Job(DBObject):
1126 def __init__(self, id=None, row=None):
1127 assert id or row
mblighe2586682008-02-29 22:45:46 +00001128 DBObject.__init__(self,
1129 ['id','owner','name','priority',
1130 'control_file','control_type','created_on',
1131 'synch_type', 'synch_count','synchronizing'],
1132 id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001133
mblighe2586682008-02-29 22:45:46 +00001134 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
1135 self.owner))
1136
1137
1138 @classmethod
1139 def _get_table(cls):
1140 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001141
1142
1143 def is_server_job(self):
1144 return self.control_type != 2
1145
1146
1147 def get_host_queue_entries(self):
mbligh6f8bab42008-02-29 22:45:14 +00001148 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001149 SELECT * FROM host_queue_entries
1150 WHERE job_id= %s
1151 """, (self.id,))
mbligh6f8bab42008-02-29 22:45:14 +00001152 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001153
1154 assert len(entries)>0
1155
1156 return entries
1157
1158
1159 def set_status(self, status, update_queues=False):
1160 self.update_field('status',status)
1161
1162 if update_queues:
1163 for queue_entry in self.get_host_queue_entries():
1164 queue_entry.set_status(status)
1165
1166
1167 def is_synchronous(self):
1168 return self.synch_type == 2
1169
1170
1171 def is_ready(self):
1172 if not self.is_synchronous():
1173 return True
1174 sql = "job_id=%s AND status='Pending'" % self.id
1175 count = self.count(sql, table='host_queue_entries')
1176 return (count == self.synch_count)
1177
1178
1179 def ready_to_synchronize(self):
1180 # heuristic
1181 queue_entries = self.get_host_queue_entries()
1182 count = 0
1183 for queue_entry in queue_entries:
1184 if queue_entry.status == 'Pending':
1185 count += 1
1186
1187 return (count/self.synch_count >= 0.5)
1188
1189
1190 def start_synchronizing(self):
1191 self.update_field('synchronizing', True)
1192
1193
1194 def results_dir(self):
1195 return self.job_dir
1196
1197 def num_machines(self, clause = None):
1198 sql = "job_id=%s" % self.id
1199 if clause:
1200 sql += " AND (%s)" % clause
1201 return self.count(sql, table='host_queue_entries')
1202
1203
1204 def num_queued(self):
1205 return self.num_machines('not complete')
1206
1207
1208 def num_active(self):
1209 return self.num_machines('active')
1210
1211
1212 def num_complete(self):
1213 return self.num_machines('complete')
1214
1215
1216 def is_finished(self):
1217 left = self.num_queued()
1218 print "%s: %s machines left" % (self.name, left)
1219 return left==0
1220
1221 def stop_synchronizing(self):
1222 self.update_field('synchronizing', False)
1223 self.set_status('Queued', update_queues = False)
1224
1225
mblighe2586682008-02-29 22:45:46 +00001226 def stop_all_entries(self):
1227 for child_entry in self.get_host_queue_entries():
1228 if not child_entry.complete:
1229 child_entry.set_status('Stopped')
1230
1231
1232 def write_to_machines_file(self, queue_entry):
1233 hostname = queue_entry.get_host().hostname
1234 print "writing %s to job %s machines file" % (hostname, self.id)
1235 file_path = os.path.join(self.job_dir, '.machines')
1236 mf = open(file_path, 'a')
1237 mf.write("%s\n" % queue_entry.get_host().hostname)
1238 mf.close()
mbligh36768f02008-02-22 18:28:33 +00001239
1240
1241 def create_results_dir(self, queue_entry=None):
1242 print "create: active: %s complete %s" % (self.num_active(),
1243 self.num_complete())
1244
1245 if not os.path.exists(self.job_dir):
1246 os.makedirs(self.job_dir)
1247
1248 if queue_entry:
1249 return queue_entry.results_dir()
1250 return self.job_dir
1251
1252
1253 def run(self, queue_entry):
1254 results_dir = self.create_results_dir(queue_entry)
1255
1256 if self.is_synchronous():
1257 if not self.is_ready():
mblighd5c95802008-03-05 00:33:46 +00001258 return Agent([VerifySynchronousTask(
1259 queue_entry = queue_entry)],
1260 [queue_entry.id])
mbligh36768f02008-02-22 18:28:33 +00001261
1262 queue_entry.set_status('Starting')
1263
1264 ctrl = open(os.tmpnam(), 'w')
1265 if self.control_file:
1266 ctrl.write(self.control_file)
1267 else:
1268 ctrl.write("")
1269 ctrl.flush()
1270
1271 if self.is_synchronous():
mbligh36768f02008-02-22 18:28:33 +00001272 queue_entries = self.get_host_queue_entries()
1273 else:
1274 assert queue_entry
mbligh36768f02008-02-22 18:28:33 +00001275 queue_entries = [queue_entry]
mblighe2586682008-02-29 22:45:46 +00001276 hostnames = ','.join([entry.get_host().hostname
1277 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00001278
mbligh4314a712008-02-29 22:44:30 +00001279 params = [_autoserv_path, '-n', '-r', results_dir,
mbligh36768f02008-02-22 18:28:33 +00001280 '-b', '-u', self.owner, '-l', self.name,
1281 '-m', hostnames, ctrl.name]
1282
1283 if not self.is_server_job():
1284 params.append('-c')
1285
1286 tasks = []
1287 if not self.is_synchronous():
1288 tasks.append(VerifyTask(queue_entry))
mblighe2586682008-02-29 22:45:46 +00001289
1290 tasks.append(QueueTask(job = self,
1291 queue_entries = queue_entries,
1292 cmd = params))
mbligh36768f02008-02-22 18:28:33 +00001293
mblighd5c95802008-03-05 00:33:46 +00001294 ids = []
1295 for entry in queue_entries:
1296 ids.append(entry.id)
1297
1298 agent = Agent(tasks, ids)
mbligh36768f02008-02-22 18:28:33 +00001299
1300 return agent
1301
1302
1303if __name__ == '__main__':
1304 main()