blob: d88ebeb6c7a282d18e8b7c71e4513cd81e4b75f2 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
mblighe2586682008-02-29 22:45:46 +00009import optparse, signal, smtplib, socket, datetime, stat
mblighb090f142008-02-27 21:33:46 +000010from common import global_config
11
mbligh36768f02008-02-22 18:28:33 +000012RESULTS_DIR = '.'
13AUTOSERV_NICE_LEVEL = 10
14
15AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
16
17if os.environ.has_key('AUTOTEST_DIR'):
18 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
19AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
20AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
21
22if AUTOTEST_SERVER_DIR not in sys.path:
23 sys.path.insert(0, AUTOTEST_SERVER_DIR)
24
mbligh6f8bab42008-02-29 22:45:14 +000025_db = None
mbligh36768f02008-02-22 18:28:33 +000026_shutdown = False
27_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000028_autoserv_path = 'autoserv'
29_testing_mode = False
mbligh36768f02008-02-22 18:28:33 +000030
31
32def main():
33 usage = 'usage: %prog [options] results_dir'
34
35 parser = optparse.OptionParser(usage)
36 parser.add_option('--no-recover', help='Skip machine/job recovery ' +
37 'step [for multiple monitors/rolling upgrades]',
38 action='store_true')
39 parser.add_option('--logfile', help='Set a log file that all stdout ' +
40 'should be redirected to. Stderr will go to this ' +
41 'file + ".err"')
42 parser.add_option('--notify', help='Set an email address to be ' +
43 'notified of exceptions')
mbligh4314a712008-02-29 22:44:30 +000044 parser.add_option('--test', help='Indicate that scheduler is under ' +
45 'test and should use dummy autoserv and no parsing',
46 action='store_true')
mbligh36768f02008-02-22 18:28:33 +000047 (options, args) = parser.parse_args()
48 if len(args) != 1:
49 parser.print_usage()
50 return
51
52 global RESULTS_DIR
53 RESULTS_DIR = args[0]
54
55 global _notify_email
56 _notify_email = options.notify
mbligh4314a712008-02-29 22:44:30 +000057
58 if options.test:
59 global _autoserv_path
60 _autoserv_path = 'autoserv_dummy'
61 global _testing_mode
62 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000063
64 init(options.logfile)
65 dispatcher = Dispatcher(do_recover = not options.no_recover)
66
67 try:
68 while not _shutdown:
69 dispatcher.tick()
70 time.sleep(20)
71 dispatcher.shut_down()
72 except:
73 log_stacktrace("Uncaught exception; terminating monitor_db")
74
mbligh6f8bab42008-02-29 22:45:14 +000075 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +000076
77
78def handle_sigint(signum, frame):
79 global _shutdown
80 _shutdown = True
81 print "Shutdown request received."
82
83
84def init(logfile):
85 if logfile:
86 enable_logging(logfile)
87 print "%s> dispatcher starting" % time.strftime("%X %x")
88 print "My PID is %d" % os.getpid()
89
90 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
mbligh6f8bab42008-02-29 22:45:14 +000091 global _db
92 _db = DatabaseConn()
mbligh36768f02008-02-22 18:28:33 +000093
94 print "Setting signal handler"
95 signal.signal(signal.SIGINT, handle_sigint)
96
97 print "Connected! Running..."
98
99
100def enable_logging(logfile):
101 out_file = logfile
102 err_file = "%s.err" % logfile
103 print "Enabling logging to %s (%s)" % (out_file, err_file)
104 out_fd = open(out_file, "a", buffering=0)
105 err_fd = open(err_file, "a", buffering=0)
106
107 os.dup2(out_fd.fileno(), sys.stdout.fileno())
108 os.dup2(err_fd.fileno(), sys.stderr.fileno())
109
110 sys.stdout = out_fd
111 sys.stderr = err_fd
112
113
114def idle_hosts():
mbligh6f8bab42008-02-29 22:45:14 +0000115 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000116 SELECT * FROM hosts h WHERE
117 id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
118 (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
119 OR
120 (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
121 INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
122 )
123 AND locked=false AND (h.status IS null OR h.status='Ready') """)
mbligh6f8bab42008-02-29 22:45:14 +0000124 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000125 return hosts
126
mblighd5c95802008-03-05 00:33:46 +0000127def queue_entries_to_abort():
128 rows = _db.execute("""
129 SELECT * FROM host_queue_entries WHERE status='Abort';
130 """)
131 qe = [HostQueueEntry(row=i) for i in rows]
132 return qe
mbligh36768f02008-02-22 18:28:33 +0000133
mblighe2586682008-02-29 22:45:46 +0000134def remove_file_or_dir(path):
135 if stat.S_ISDIR(os.stat(path).st_mode):
136 # directory
137 shutil.rmtree(path)
138 else:
139 # file
140 os.remove(path)
141
142
mbligh6f8bab42008-02-29 22:45:14 +0000143class DatabaseConn:
144 def __init__(self):
145 self.reconnect_wait = 20
146 self.conn = None
147 self.cur = None
mbligh36768f02008-02-22 18:28:33 +0000148
mbligh6f8bab42008-02-29 22:45:14 +0000149 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000150
151
mbligh6f8bab42008-02-29 22:45:14 +0000152 def connect(self):
153 self.disconnect()
154
155 # get global config and parse for info
156 c = global_config.global_config
157 dbase = "AUTOTEST_WEB"
158 DB_HOST = c.get_config_value(dbase, "host", "localhost")
159 DB_SCHEMA = c.get_config_value(dbase, "database",
160 "autotest_web")
161
162 global _testing_mode
163 if _testing_mode:
164 DB_SCHEMA = 'stresstest_autotest_web'
165
166 DB_USER = c.get_config_value(dbase, "user", "autotest")
167 DB_PASS = c.get_config_value(dbase, "password", "google")
168
169 while not self.conn:
170 try:
171 self.conn = MySQLdb.connect(host=DB_HOST,
172 user=DB_USER,
173 passwd=DB_PASS,
174 db=DB_SCHEMA)
175
176 self.conn.autocommit(True)
177 self.cur = self.conn.cursor()
178 except MySQLdb.OperationalError:
mbligh6f8bab42008-02-29 22:45:14 +0000179 print "Can't connect to MYSQL; reconnecting"
180 time.sleep(self.reconnect_wait)
181 self.disconnect()
182
183
184 def disconnect(self):
185 if self.conn:
186 self.conn.close()
187 self.conn = None
188 self.cur = None
189
190
191 def execute(self, *args, **dargs):
192 while (True):
193 try:
194 self.cur.execute(*args, **dargs)
195 return self.cur.fetchall()
196 except MySQLdb.OperationalError:
197 print "MYSQL connection died; reconnecting"
198 time.sleep(self.reconnect_wait)
199 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000200
201
mblighdbdac6c2008-03-05 15:49:58 +0000202def generate_parse_command(results_dir, flags=""):
203 parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
204 output = os.path.abspath(os.path.join(results_dir, '.parse.log'))
205 cmd = "%s %s -r -o %s > %s 2>&1 &"
206 return cmd % (parse, flags, results_dir, output)
207
208
mbligh36768f02008-02-22 18:28:33 +0000209def parse_results(results_dir, flags=""):
mbligh4314a712008-02-29 22:44:30 +0000210 if _testing_mode:
211 return
mblighdbdac6c2008-03-05 15:49:58 +0000212 os.system(generate_parse_command(results_dir, flags))
mbligh36768f02008-02-22 18:28:33 +0000213
214
215def log_stacktrace(reason):
216 (type, value, tb) = sys.exc_info()
217 str = "EXCEPTION: %s\n" % reason
218 str += "%s / %s / %s\n" % (socket.gethostname(), os.getpid(),
219 time.strftime("%X %x"))
220 str += ''.join(traceback.format_exception(type, value, tb))
221
222 sys.stderr.write("\n%s\n" % str)
223
224 if _notify_email:
225 sender = "monitor_db"
226 subject = "monitor_db exception"
227 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
228 sender, _notify_email, subject, str)
229 mailer = smtplib.SMTP('localhost')
230 mailer.sendmail(sender, _notify_email, msg)
231 mailer.quit()
232
233
234class Dispatcher:
235 def __init__(self, do_recover=True):
236 self._agents = []
237 self.shutting_down = False
238
239 if do_recover:
240 self._recover_lost()
241
242
243 def shut_down(self):
244 print "Shutting down!"
245 self.shutting_down = True
246 while self._agents:
247 self.tick()
248 time.sleep(40)
249
250
251 def tick(self):
252 if not self.shutting_down:
mblighd5c95802008-03-05 00:33:46 +0000253 self._find_aborting()
mbligh36768f02008-02-22 18:28:33 +0000254 self._find_more_work()
255 self._handle_agents()
256
257
258 def add_agent(self, agent):
259 self._agents.append(agent)
260 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000261
262 # Find agent corresponding to the specified queue_entry
263 def get_agents(self, queue_entry):
264 res_agents = []
265 for agent in self._agents:
266 if queue_entry.id in agent.queue_entry_ids:
267 res_agents.append(agent)
268 return res_agents
269
270
271 def remove_agent(self, agent):
272 self._agents.remove(agent)
mbligh36768f02008-02-22 18:28:33 +0000273
274
275 def _recover_lost(self):
mblighd5c95802008-03-05 00:33:46 +0000276 rows = _db.execute("""SELECT * FROM host_queue_entries WHERE active AND NOT complete AND status != 'Abort' AND status != 'Aborting'""")
mbligh6f8bab42008-02-29 22:45:14 +0000277 if len(rows) > 0:
278 queue_entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000279 for queue_entry in queue_entries:
280 job = queue_entry.job
281 if job.is_synchronous():
282 for child_entry in job.get_host_queue_entries():
283 child_entry.requeue()
284 else:
285 queue_entry.requeue()
286 queue_entry.clear_results_dir()
287
mblighd5c95802008-03-05 00:33:46 +0000288 rebooting_host_ids = []
289 rows = _db.execute("""SELECT * FROM host_queue_entries
290 WHERE status='Abort' or status='Aborting'""")
291 if len(rows) > 0:
292 queue_entries = [HostQueueEntry(row=i) for i in rows]
293 for queue_entry in queue_entries:
294 queue_host = queue_entry.get_host()
295 reboot_task = RebootTask(queue_host)
296 verify_task = VerifyTask(host = queue_host)
297 self.add_agent(Agent(tasks=[reboot_task,
298 verify_task],
299 queue_entry_ids=[queue_entry.id]))
300 queue_entry.set_status('Aborted')
301 # Secure the host from being picked up
302 queue_host.set_status('Rebooting')
303 rebooting_host_ids.append(queue_host.id)
304
mbligh6f8bab42008-02-29 22:45:14 +0000305 rows = _db.execute("""SELECT * FROM hosts
mbligh36768f02008-02-22 18:28:33 +0000306 WHERE status != 'Ready' AND NOT locked""")
mbligh6f8bab42008-02-29 22:45:14 +0000307 if len(rows) > 0:
308 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000309 for host in hosts:
mblighd5c95802008-03-05 00:33:46 +0000310 if host.id in rebooting_host_ids:
311 continue
mbligh36768f02008-02-22 18:28:33 +0000312 verify_task = VerifyTask(host = host)
313 self.add_agent(Agent(tasks = [verify_task]))
314
315
316 def _find_more_work(self):
317 print "finding work"
318
319 num_started = 0
320 for host in idle_hosts():
321 tasks = host.next_queue_entries()
322 if tasks:
323 for next in tasks:
324 try:
325 agent = next.run(assigned_host=host)
326 if agent:
327 self.add_agent(agent)
328
329 num_started += 1
330 if num_started>=100:
331 return
332 break
333 except:
334 next.set_status('Failed')
335
336# if next.host:
337# next.host.set_status('Ready')
338
339 log_stacktrace("task_id = %d" % next.id)
340
341
mblighd5c95802008-03-05 00:33:46 +0000342 def _find_aborting(self):
343 num_aborted = 0
344 # Find jobs that are aborting
345 for entry in queue_entries_to_abort():
346 agents_to_abort = self.get_agents(entry)
347 entry_host = entry.get_host()
348 reboot_task = RebootTask(entry_host)
349 verify_task = VerifyTask(host = entry_host)
350 tasks = [reboot_task, verify_task]
351 if agents_to_abort:
352 abort_task = AbortTask(entry, agents_to_abort)
353 tasks.insert(0, abort_task)
354 else:
355 entry.set_status('Aborted')
356 # just to make sure this host does not get
357 # taken away
358 entry_host.set_status('Rebooting')
359 self.add_agent(Agent(tasks=tasks,
360 queue_entry_ids = [entry.id]))
361 num_aborted += 1
362 if num_aborted >= 50:
363 break
364
365
mbligh36768f02008-02-22 18:28:33 +0000366 def _handle_agents(self):
367 still_running = []
368 for agent in self._agents:
369 agent.tick()
370 if not agent.is_done():
371 still_running.append(agent)
372 else:
373 print "agent finished"
374 self._agents = still_running
375
376
377class RunMonitor(object):
378 def __init__(self, cmd, nice_level = None, log_file = None):
379 self.nice_level = nice_level
380 self.log_file = log_file
381 self.proc = self.run(cmd)
382
383 def run(self, cmd):
384 if self.nice_level:
385 nice_cmd = ['nice','-n', str(self.nice_level)]
386 nice_cmd.extend(cmd)
387 cmd = nice_cmd
388
389 out_file = None
390 if self.log_file:
391 try:
392 out_file = open(self.log_file, 'a')
393 out_file.write("\n%s\n" % ('*'*80))
394 out_file.write("%s> %s\n" % (time.strftime("%X %x"), cmd))
395 out_file.write("%s\n" % ('*'*80))
396 except:
397 pass
398
399 if not out_file:
400 out_file = open('/dev/null', 'w')
401
402 in_devnull = open('/dev/null', 'r')
403 print "cmd = %s" % cmd
404 print "path = %s" % os.getcwd()
405
406 proc = subprocess.Popen(cmd, stdout=out_file,
407 stderr=subprocess.STDOUT, stdin=in_devnull)
408 out_file.close()
409 in_devnull.close()
410 return proc
411
412
413 def kill(self):
mblighd5c95802008-03-05 00:33:46 +0000414 for i in range(0, 4):
415 if self.proc.poll() == None:
416 os.kill(self.proc.pid, signal.SIGTERM)
417 time.sleep(5)
418 # Check that the process was terminated
419 if self.proc.poll() != None:
420 return
421
422 print ("""Error: process %d has not terminated""" %
423 self.proc.pid)
424
mbligh36768f02008-02-22 18:28:33 +0000425
426 def exit_code(self):
427 return self.proc.poll()
428
429
430class Agent(object):
mblighd5c95802008-03-05 00:33:46 +0000431 def __init__(self, tasks, queue_entry_ids=[]):
mbligh36768f02008-02-22 18:28:33 +0000432 self.active_task = None
433 self.queue = Queue.Queue(0)
434 self.dispatcher = None
mblighd5c95802008-03-05 00:33:46 +0000435 self.queue_entry_ids = queue_entry_ids
mbligh36768f02008-02-22 18:28:33 +0000436
437 for task in tasks:
438 self.add_task(task)
439
440
441 def add_task(self, task):
442 self.queue.put_nowait(task)
443 task.agent = self
444
445
446 def tick(self):
447 print "agent tick"
448 if self.active_task and not self.active_task.is_done():
449 self.active_task.poll()
450 else:
451 self._next_task();
452
453
454 def _next_task(self):
455 print "agent picking task"
456 if self.active_task:
457 assert self.active_task.is_done()
458
mblighe2586682008-02-29 22:45:46 +0000459 if not self.active_task.success:
460 self.on_task_failure()
461
mbligh36768f02008-02-22 18:28:33 +0000462 self.active_task = None
463 if not self.is_done():
464 self.active_task = self.queue.get_nowait()
465 if self.active_task:
466 self.active_task.start()
467
468
mblighe2586682008-02-29 22:45:46 +0000469 def on_task_failure(self):
mblighe2586682008-02-29 22:45:46 +0000470 self.queue = Queue.Queue(0)
471 for task in self.active_task.failure_tasks:
472 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +0000473
mblighe2586682008-02-29 22:45:46 +0000474
mbligh36768f02008-02-22 18:28:33 +0000475 def is_done(self):
476 return self.active_task == None and self.queue.empty()
477
478
479 def start(self):
480 assert self.dispatcher
481
482 self._next_task()
483
mblighd5c95802008-03-05 00:33:46 +0000484
mbligh36768f02008-02-22 18:28:33 +0000485class AgentTask(object):
mbligh16c722d2008-03-05 00:58:44 +0000486 def __init__(self, cmd, failure_tasks = []):
mblighe2586682008-02-29 22:45:46 +0000487 """\
488 By default, on failure, the Agent's task queue is cleared and
489 replaced with the tasks in failure_tasks. If
490 clear_queue_on_failure=False, the task queue will not be
491 cleared, and the tasks in failure_tasks will be inserted at the
492 beginning of the queue.
493 """
mbligh36768f02008-02-22 18:28:33 +0000494 self.done = False
495 self.failure_tasks = failure_tasks
496 self.started = False
497 self.cmd = cmd
mblighd5c95802008-03-05 00:33:46 +0000498 self.task = None
mbligh36768f02008-02-22 18:28:33 +0000499 self.agent = None
mblighd5c95802008-03-05 00:33:46 +0000500 self.monitor = None
mbligh36768f02008-02-22 18:28:33 +0000501
502
503 def poll(self):
504 print "poll"
mblighd5c95802008-03-05 00:33:46 +0000505 if self.monitor:
mbligh36768f02008-02-22 18:28:33 +0000506 self.tick(self.monitor.exit_code())
507 else:
508 self.finished(False)
509
510
511 def tick(self, exit_code):
512 if exit_code==None:
513 return
514# print "exit_code was %d" % exit_code
515 if exit_code == 0:
516 success = True
517 else:
518 success = False
519
520 self.finished(success)
521
522
523 def is_done(self):
524 return self.done
525
526
527 def finished(self, success):
528 self.done = True
529 self.success = success
530 self.epilog()
531
532
533 def prolog(self):
534 pass
535
536
537 def epilog(self):
538 pass
539
540
541 def start(self):
542 assert self.agent
543
544 if not self.started:
545 self.prolog()
546 self.run()
547
548 self.started = True
549
550
551 def abort(self):
mblighd5c95802008-03-05 00:33:46 +0000552 if self.monitor:
553 self.monitor.kill()
554 self.done = True
mbligh36768f02008-02-22 18:28:33 +0000555
556
557 def run(self):
558 if self.cmd:
559 print "agent starting monitor"
560
561 log_file = None
562 if hasattr(self, 'host'):
563 log_file = os.path.join(os.path.join(RESULTS_DIR, 'hosts'), self.host.hostname)
564
565 self.monitor = RunMonitor(self.cmd, nice_level = AUTOSERV_NICE_LEVEL, log_file = log_file)
566
567
568class RepairTask(AgentTask):
mbligh16c722d2008-03-05 00:58:44 +0000569 def __init__(self, host, fail_queue_entry=None):
570 """\
571 fail_queue_entry: queue entry to mark failed if this repair
572 fails.
573 """
mbligh48c10a52008-02-29 22:46:38 +0000574 cmd = [_autoserv_path , '-R', '-m', host.hostname]
mbligh36768f02008-02-22 18:28:33 +0000575 self.host = host
mbligh16c722d2008-03-05 00:58:44 +0000576 self.fail_queue_entry = fail_queue_entry
577 AgentTask.__init__(self, cmd)
mblighe2586682008-02-29 22:45:46 +0000578
mbligh36768f02008-02-22 18:28:33 +0000579
580 def prolog(self):
581 print "repair_task starting"
582 self.host.set_status('Repairing')
583
584
585 def epilog(self):
586 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000587 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +0000588 else:
mbligh16c722d2008-03-05 00:58:44 +0000589 self.host.set_status('Repair Failed')
590 if self.fail_queue_entry:
591 self.fail_queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +0000592
593
594class VerifyTask(AgentTask):
595 def __init__(self, queue_entry=None, host=None):
596 assert bool(queue_entry) != bool(host)
597
598 self.host = host or queue_entry.host
599 self.queue_entry = queue_entry
600
601 self.temp_results_dir = tempfile.mkdtemp(suffix='.verify')
mbligh48c10a52008-02-29 22:46:38 +0000602 cmd = [_autoserv_path,'-v','-m',self.host.hostname,
mbligh36768f02008-02-22 18:28:33 +0000603 '-r', self.temp_results_dir]
604
mbligh16c722d2008-03-05 00:58:44 +0000605 fail_queue_entry = None
606 if queue_entry and not queue_entry.meta_host:
607 fail_queue_entry = queue_entry
608 failure_tasks = [RepairTask(self.host, fail_queue_entry)]
mblighe2586682008-02-29 22:45:46 +0000609
mblighdffd6372008-02-29 22:47:33 +0000610 AgentTask.__init__(self, cmd, failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +0000611
612
mbligh36768f02008-02-22 18:28:33 +0000613 def prolog(self):
614 print "starting verify on %s" % (self.host.hostname)
615 if self.queue_entry:
616 self.queue_entry.set_status('Verifying')
mblighdffd6372008-02-29 22:47:33 +0000617 self.queue_entry.clear_results_dir(
618 self.queue_entry.verify_results_dir())
mbligh36768f02008-02-22 18:28:33 +0000619 self.host.set_status('Verifying')
620
621
622 def epilog(self):
623 if self.queue_entry and (self.success or
624 not self.queue_entry.meta_host):
625 self.move_results()
mblighe2586682008-02-29 22:45:46 +0000626 shutil.rmtree(self.temp_results_dir)
mbligh36768f02008-02-22 18:28:33 +0000627
628 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000629 self.host.set_status('Ready')
630 elif self.queue_entry:
mblighdffd6372008-02-29 22:47:33 +0000631 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +0000632
633
634 def move_results(self):
635 assert self.queue_entry is not None
mblighe2586682008-02-29 22:45:46 +0000636 target_dir = self.queue_entry.verify_results_dir()
mbligh36768f02008-02-22 18:28:33 +0000637 if not os.path.exists(target_dir):
638 os.makedirs(target_dir)
639 files = os.listdir(self.temp_results_dir)
640 for filename in files:
mblighe2586682008-02-29 22:45:46 +0000641 self.force_move(os.path.join(self.temp_results_dir,
642 filename),
643 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +0000644
645
mblighe2586682008-02-29 22:45:46 +0000646 @staticmethod
647 def force_move(source, dest):
648 """\
649 Replacement for shutil.move() that will delete the destination
650 if it exists, even if it's a directory.
651 """
652 if os.path.exists(dest):
653 print ('Warning: removing existing destination file ' +
654 dest)
655 remove_file_or_dir(dest)
656 shutil.move(source, dest)
657
658
mblighdffd6372008-02-29 22:47:33 +0000659class VerifySynchronousTask(VerifyTask):
660 def __init__(self, queue_entry):
661 VerifyTask.__init__(self, queue_entry = queue_entry)
662
663
mbligh16c722d2008-03-05 00:58:44 +0000664 def epilog(self):
665 VerifyTask.epilog(self)
666 if self.success:
667 if self.queue_entry.job.num_complete() > 0:
668 # some other entry failed verify, and we've
669 # already been marked as stopped
670 return
mblighdffd6372008-02-29 22:47:33 +0000671
mbligh16c722d2008-03-05 00:58:44 +0000672 self.queue_entry.set_status('Pending')
673 job = self.queue_entry.job
674 if job.is_ready():
675 agent = job.run(self.queue_entry)
676 self.agent.dispatcher.add_agent(agent)
mblighe2586682008-02-29 22:45:46 +0000677
mbligh36768f02008-02-22 18:28:33 +0000678class QueueTask(AgentTask):
679 def __init__(self, job, queue_entries, cmd):
680 AgentTask.__init__(self, cmd)
681 self.job = job
682 self.queue_entries = queue_entries
683
684
mbligh4314a712008-02-29 22:44:30 +0000685 @staticmethod
686 def _write_keyval(queue_entry, field, value):
mbligh36768f02008-02-22 18:28:33 +0000687 key_path = os.path.join(queue_entry.results_dir(), 'keyval')
688 keyval_file = open(key_path, 'a')
689 print >> keyval_file, '%s=%d' % (field, value)
690 keyval_file.close()
691
692
693 def prolog(self):
mblighdbdac6c2008-03-05 15:49:58 +0000694 # write the parser commands into the results directories
695 if self.job.is_synchronous() or self.job.num_machines()==1:
696 results_dir = self.job.results_dir()
697 cmdfile = os.path.join(results_dir, '.parse.cmd')
698 cmd = generate_parse_command(results_dir)
699 print >> open(cmdfile, 'w'), cmd
700 else:
701 for queue_entry in self.queue_entries:
702 results_dir = queue_entry.results_dir()
703 cmdfile = os.path.join(results_dir,
704 '.parse.cmd')
705 cmd = generate_parse_command(results_dir,
706 '-l 2')
707 print >> open(cmdfile, 'w'), cmd
708
mblighe2586682008-02-29 22:45:46 +0000709 # write some job timestamps into the job keyval file
710 queued = time.mktime(self.job.created_on.timetuple())
711 started = time.time()
712 self._write_keyval(self.queue_entries[0], "job_queued", queued)
713 self._write_keyval(self.queue_entries[0], "job_started",
714 started)
mbligh36768f02008-02-22 18:28:33 +0000715 for queue_entry in self.queue_entries:
716 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
717 queue_entry.set_status('Running')
718 queue_entry.host.set_status('Running')
mblighe2586682008-02-29 22:45:46 +0000719 if (not self.job.is_synchronous() and
720 self.job.num_machines() > 1):
721 assert len(self.queue_entries) == 1
722 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +0000723
724
725 def epilog(self):
726 if self.success:
727 status = 'Completed'
728 else:
729 status = 'Failed'
730
mblighe2586682008-02-29 22:45:46 +0000731 # write another timestamp into the job keyval file
732 finished = time.time()
733 self._write_keyval(self.queue_entries[0], "job_finished",
734 finished)
mbligh36768f02008-02-22 18:28:33 +0000735 for queue_entry in self.queue_entries:
736 queue_entry.set_status(status)
737 queue_entry.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +0000738
739 if self.job.is_synchronous() or self.job.num_machines()==1:
740 if self.job.is_finished():
741 parse_results(self.job.results_dir())
742 else:
743 for queue_entry in self.queue_entries:
744 parse_results(queue_entry.results_dir(), flags='-l 2')
745
746 print "queue_task finished with %s/%s" % (status, self.success)
747
748
749class RebootTask(AgentTask):
mblighd5c95802008-03-05 00:33:46 +0000750 def __init__(self, host):
751 global _autoserv_path
752
753 # Current implementation of autoserv requires control file
754 # to be passed on reboot action request. TODO: remove when no
755 # longer appropriate.
756 self.cmd = [_autoserv_path, '-b', '-m', host.hostname,
757 '/dev/null']
mbligh36768f02008-02-22 18:28:33 +0000758 self.host = host
mblighd5c95802008-03-05 00:33:46 +0000759 AgentTask.__init__(self, self.cmd,
mbligh16c722d2008-03-05 00:58:44 +0000760 failure_tasks=[RepairTask(host)])
761
mblighd5c95802008-03-05 00:33:46 +0000762
763 def prolog(self):
764 print "starting reboot task for host: %s" % self.host.hostname
765 self.host.set_status("Rebooting")
766
mblighd5c95802008-03-05 00:33:46 +0000767
768class AbortTask(AgentTask):
769 def __init__(self, queue_entry, agents_to_abort):
770 self.queue_entry = queue_entry
771 self.agents_to_abort = agents_to_abort
772 for agent in agents_to_abort:
773 agent.dispatcher.remove_agent(agent)
774 AgentTask.__init__(self, '')
mbligh36768f02008-02-22 18:28:33 +0000775
776
mblighd5c95802008-03-05 00:33:46 +0000777 def prolog(self):
778 print "starting abort on host %s, job %s" % (
779 self.queue_entry.host_id, self.queue_entry.job_id)
780 self.queue_entry.set_status('Aborting')
781
mbligh36768f02008-02-22 18:28:33 +0000782
mblighd5c95802008-03-05 00:33:46 +0000783 def epilog(self):
784 self.queue_entry.set_status('Aborted')
785 self.success = True
mbligh36768f02008-02-22 18:28:33 +0000786
787 def run(self):
mblighd5c95802008-03-05 00:33:46 +0000788 for agent in self.agents_to_abort:
789 if (agent.active_task):
790 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +0000791
792
793class DBObject(object):
mblighe2586682008-02-29 22:45:46 +0000794 def __init__(self, fields, id=None, row=None, new_record=False):
795 assert (bool(id) != bool(row)) and fields
mbligh36768f02008-02-22 18:28:33 +0000796
mblighe2586682008-02-29 22:45:46 +0000797 self.__table = self._get_table()
mbligh36768f02008-02-22 18:28:33 +0000798 self.__fields = fields
799
800 self.__new_record = new_record
801
802 if row is None:
803 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
mbligh6f8bab42008-02-29 22:45:14 +0000804 rows = _db.execute(sql, (id,))
805 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +0000806 raise "row not found (table=%s, id=%s)" % \
807 (self.__table, id)
mbligh6f8bab42008-02-29 22:45:14 +0000808 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +0000809
mblighe2586682008-02-29 22:45:46 +0000810 assert len(row)==len(fields), (
811 "table = %s, row = %s/%d, fields = %s/%d" % (
812 self.__table, row, len(row), fields, len(fields)))
mbligh36768f02008-02-22 18:28:33 +0000813
814 self.__valid_fields = {}
815 for i,value in enumerate(row):
816 self.__dict__[fields[i]] = value
817 self.__valid_fields[fields[i]] = True
818
819 del self.__valid_fields['id']
820
mblighe2586682008-02-29 22:45:46 +0000821
822 @classmethod
823 def _get_table(cls):
824 raise NotImplementedError('Subclasses must override this')
825
826
mbligh36768f02008-02-22 18:28:33 +0000827 def count(self, where, table = None):
828 if not table:
829 table = self.__table
mbligh4314a712008-02-29 22:44:30 +0000830
mbligh6f8bab42008-02-29 22:45:14 +0000831 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000832 SELECT count(*) FROM %s
833 WHERE %s
834 """ % (table, where))
mbligh36768f02008-02-22 18:28:33 +0000835
mbligh6f8bab42008-02-29 22:45:14 +0000836 assert len(rows) == 1
837
838 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +0000839
840
841 def num_cols(self):
842 return len(self.__fields)
843
844
845 def update_field(self, field, value):
846 assert self.__valid_fields[field]
847
848 if self.__dict__[field] == value:
849 return
850
851 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
852 (self.__table, field)
mbligh6f8bab42008-02-29 22:45:14 +0000853 _db.execute(query, (value, self.id))
mbligh36768f02008-02-22 18:28:33 +0000854
855 self.__dict__[field] = value
856
857
858 def save(self):
859 if self.__new_record:
860 keys = self.__fields[1:] # avoid id
861 columns = ','.join([str(key) for key in keys])
862 values = ['"%s"' % self.__dict__[key] for key in keys]
863 values = ','.join(values)
864 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
865 (self.__table, columns, values)
mbligh6f8bab42008-02-29 22:45:14 +0000866 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +0000867
868
mblighe2586682008-02-29 22:45:46 +0000869 def delete(self):
870 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
871 _db.execute(query, (self.id,))
872
873
874 @classmethod
875 def fetch(cls, where):
876 rows = _db.execute(
877 'SELECT * FROM %s WHERE %s' % (cls._get_table(), where))
878 for row in rows:
879 yield cls(row=row)
880
mbligh36768f02008-02-22 18:28:33 +0000881
882class IneligibleHostQueue(DBObject):
883 def __init__(self, id=None, row=None, new_record=None):
884 fields = ['id', 'job_id', 'host_id']
mblighe2586682008-02-29 22:45:46 +0000885 DBObject.__init__(self, fields, id=id, row=row,
886 new_record=new_record)
887
888
889 @classmethod
890 def _get_table(cls):
891 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +0000892
893
894class Host(DBObject):
895 def __init__(self, id=None, row=None):
896 fields = ['id', 'hostname', 'locked', 'synch_id','status']
mblighe2586682008-02-29 22:45:46 +0000897 DBObject.__init__(self, fields, id=id, row=row)
898
899
900 @classmethod
901 def _get_table(cls):
902 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +0000903
904
905 def current_task(self):
mbligh6f8bab42008-02-29 22:45:14 +0000906 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000907 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
908 """, (self.id,))
909
mbligh6f8bab42008-02-29 22:45:14 +0000910 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +0000911 return None
912 else:
mbligh6f8bab42008-02-29 22:45:14 +0000913 assert len(rows) == 1
914 results = rows[0];
mbligh36768f02008-02-22 18:28:33 +0000915# print "current = %s" % results
916 return HostQueueEntry(row=results)
917
918
919 def next_queue_entries(self):
920 if self.locked:
921 print "%s locked, not queuing" % self.hostname
922 return None
923# print "%s/%s looking for work" % (self.hostname, self.platform_id)
mbligh6f8bab42008-02-29 22:45:14 +0000924 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000925 SELECT * FROM host_queue_entries
926 WHERE ((host_id=%s) OR (meta_host IS NOT null AND
927 (meta_host IN (
928 SELECT label_id FROM hosts_labels WHERE host_id=%s
929 )
930 )
931 AND job_id NOT IN (
932 SELECT job_id FROM ineligible_host_queues
933 WHERE host_id=%s
934 )))
935 AND NOT complete AND NOT active
936 ORDER BY priority DESC, meta_host, id
937 LIMIT 1
938 """, (self.id,self.id, self.id))
939
mbligh6f8bab42008-02-29 22:45:14 +0000940 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +0000941 return None
942 else:
mbligh6f8bab42008-02-29 22:45:14 +0000943 return [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000944
945 def yield_work(self):
946 print "%s yielding work" % self.hostname
947 if self.current_task():
948 self.current_task().requeue()
949
950 def set_status(self,status):
951 self.update_field('status',status)
952
953
954class HostQueueEntry(DBObject):
955 def __init__(self, id=None, row=None):
956 assert id or row
957 fields = ['id', 'job_id', 'host_id', 'priority', 'status',
958 'meta_host', 'active', 'complete']
mblighe2586682008-02-29 22:45:46 +0000959 DBObject.__init__(self, fields, id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +0000960
961 self.job = Job(self.job_id)
962
963 if self.host_id:
964 self.host = Host(self.host_id)
965 else:
966 self.host = None
967
968 self.queue_log_path = os.path.join(self.job.results_dir(),
969 'queue.log.' + str(self.id))
970
971
mblighe2586682008-02-29 22:45:46 +0000972 @classmethod
973 def _get_table(cls):
974 return 'host_queue_entries'
975
976
mbligh36768f02008-02-22 18:28:33 +0000977 def set_host(self, host):
978 if host:
979 self.queue_log_record('Assigning host ' + host.hostname)
980 self.update_field('host_id', host.id)
981 self.update_field('active', True)
mblighe2586682008-02-29 22:45:46 +0000982 self.block_host(host.id)
mbligh36768f02008-02-22 18:28:33 +0000983 else:
984 self.queue_log_record('Releasing host')
mblighe2586682008-02-29 22:45:46 +0000985 self.unblock_host(self.host.id)
mbligh36768f02008-02-22 18:28:33 +0000986 self.update_field('host_id', None)
987
988 self.host = host
989
990
991 def get_host(self):
mblighe2586682008-02-29 22:45:46 +0000992 return self.host
mbligh36768f02008-02-22 18:28:33 +0000993
994
995 def queue_log_record(self, log_line):
mblighe2586682008-02-29 22:45:46 +0000996 now = str(datetime.datetime.now())
mbligh36768f02008-02-22 18:28:33 +0000997 queue_log = open(self.queue_log_path, 'a', 0)
mblighe2586682008-02-29 22:45:46 +0000998 queue_log.write(now + ' ' + log_line + '\n')
mbligh36768f02008-02-22 18:28:33 +0000999 queue_log.close()
1000
1001
mblighe2586682008-02-29 22:45:46 +00001002 def block_host(self, host_id):
1003 print "creating block %s/%s" % (self.job.id, host_id)
1004 row = [0, self.job.id, host_id]
1005 block = IneligibleHostQueue(row=row, new_record=True)
1006 block.save()
1007
1008
1009 def unblock_host(self, host_id):
1010 print "removing block %s/%s" % (self.job.id, host_id)
1011 blocks = list(IneligibleHostQueue.fetch(
1012 'job_id=%d and host_id=%d' % (self.job.id, host_id)))
1013 assert len(blocks) == 1
1014 blocks[0].delete()
1015
1016
mbligh36768f02008-02-22 18:28:33 +00001017 def results_dir(self):
mblighe2586682008-02-29 22:45:46 +00001018 if self.job.is_synchronous() or self.job.num_machines() == 1:
1019 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001020 else:
1021 assert self.host
mblighe2586682008-02-29 22:45:46 +00001022 return os.path.join(self.job.job_dir,
1023 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +00001024
mblighe2586682008-02-29 22:45:46 +00001025
1026 def verify_results_dir(self):
1027 if self.job.is_synchronous() or self.job.num_machines() > 1:
1028 assert self.host
1029 return os.path.join(self.job.job_dir,
1030 self.host.hostname)
1031 else:
1032 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001033
1034
1035 def set_status(self, status):
1036 self.update_field('status', status)
1037 if self.host:
1038 hostname = self.host.hostname
1039 else:
1040 hostname = 'no host'
1041 print "%s/%d status -> %s" % (hostname, self.id, self.status)
1042 if status in ['Queued']:
1043 self.update_field('complete', False)
1044 self.update_field('active', False)
1045
mblighd5c95802008-03-05 00:33:46 +00001046 if status in ['Pending', 'Running', 'Verifying', 'Starting',
1047 'Abort', 'Aborting']:
mbligh36768f02008-02-22 18:28:33 +00001048 self.update_field('complete', False)
1049 self.update_field('active', True)
1050
mblighd5c95802008-03-05 00:33:46 +00001051 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
mbligh36768f02008-02-22 18:28:33 +00001052 self.update_field('complete', True)
1053 self.update_field('active', False)
1054
1055
1056 def run(self,assigned_host=None):
1057 if self.meta_host:
1058 assert assigned_host
mblighe2586682008-02-29 22:45:46 +00001059 # ensure results dir exists for the queue log
mbligh36768f02008-02-22 18:28:33 +00001060 self.job.create_results_dir()
1061 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001062
mbligh36768f02008-02-22 18:28:33 +00001063 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1064 self.meta_host, self.host.hostname, self.status)
1065
1066 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001067
mbligh36768f02008-02-22 18:28:33 +00001068 def requeue(self):
1069 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001070
mbligh36768f02008-02-22 18:28:33 +00001071 if self.meta_host:
1072 self.set_host(None)
1073
1074
mblighe2586682008-02-29 22:45:46 +00001075 def handle_host_failure(self):
1076 """\
1077 Called when this queue entry's host has failed verification and
1078 repair.
1079 """
mblighdffd6372008-02-29 22:47:33 +00001080 assert not self.meta_host
1081 self.set_status('Failed')
1082 if self.job.is_synchronous():
1083 self.job.stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00001084
1085
1086 def clear_results_dir(self, results_dir=None):
1087 results_dir = results_dir or self.results_dir()
1088 if not os.path.exists(results_dir):
1089 return
1090 for filename in os.listdir(results_dir):
1091 if 'queue.log' in filename:
1092 continue
1093 path = os.path.join(results_dir, filename)
1094 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00001095
1096
1097class Job(DBObject):
1098 def __init__(self, id=None, row=None):
1099 assert id or row
mblighe2586682008-02-29 22:45:46 +00001100 DBObject.__init__(self,
1101 ['id','owner','name','priority',
1102 'control_file','control_type','created_on',
1103 'synch_type', 'synch_count','synchronizing'],
1104 id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001105
mblighe2586682008-02-29 22:45:46 +00001106 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
1107 self.owner))
1108
1109
1110 @classmethod
1111 def _get_table(cls):
1112 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001113
1114
1115 def is_server_job(self):
1116 return self.control_type != 2
1117
1118
1119 def get_host_queue_entries(self):
mbligh6f8bab42008-02-29 22:45:14 +00001120 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001121 SELECT * FROM host_queue_entries
1122 WHERE job_id= %s
1123 """, (self.id,))
mbligh6f8bab42008-02-29 22:45:14 +00001124 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001125
1126 assert len(entries)>0
1127
1128 return entries
1129
1130
1131 def set_status(self, status, update_queues=False):
1132 self.update_field('status',status)
1133
1134 if update_queues:
1135 for queue_entry in self.get_host_queue_entries():
1136 queue_entry.set_status(status)
1137
1138
1139 def is_synchronous(self):
1140 return self.synch_type == 2
1141
1142
1143 def is_ready(self):
1144 if not self.is_synchronous():
1145 return True
1146 sql = "job_id=%s AND status='Pending'" % self.id
1147 count = self.count(sql, table='host_queue_entries')
1148 return (count == self.synch_count)
1149
1150
1151 def ready_to_synchronize(self):
1152 # heuristic
1153 queue_entries = self.get_host_queue_entries()
1154 count = 0
1155 for queue_entry in queue_entries:
1156 if queue_entry.status == 'Pending':
1157 count += 1
1158
1159 return (count/self.synch_count >= 0.5)
1160
1161
1162 def start_synchronizing(self):
1163 self.update_field('synchronizing', True)
1164
1165
1166 def results_dir(self):
1167 return self.job_dir
1168
1169 def num_machines(self, clause = None):
1170 sql = "job_id=%s" % self.id
1171 if clause:
1172 sql += " AND (%s)" % clause
1173 return self.count(sql, table='host_queue_entries')
1174
1175
1176 def num_queued(self):
1177 return self.num_machines('not complete')
1178
1179
1180 def num_active(self):
1181 return self.num_machines('active')
1182
1183
1184 def num_complete(self):
1185 return self.num_machines('complete')
1186
1187
1188 def is_finished(self):
1189 left = self.num_queued()
1190 print "%s: %s machines left" % (self.name, left)
1191 return left==0
1192
1193 def stop_synchronizing(self):
1194 self.update_field('synchronizing', False)
1195 self.set_status('Queued', update_queues = False)
1196
1197
mblighe2586682008-02-29 22:45:46 +00001198 def stop_all_entries(self):
1199 for child_entry in self.get_host_queue_entries():
1200 if not child_entry.complete:
1201 child_entry.set_status('Stopped')
1202
1203
1204 def write_to_machines_file(self, queue_entry):
1205 hostname = queue_entry.get_host().hostname
1206 print "writing %s to job %s machines file" % (hostname, self.id)
1207 file_path = os.path.join(self.job_dir, '.machines')
1208 mf = open(file_path, 'a')
1209 mf.write("%s\n" % queue_entry.get_host().hostname)
1210 mf.close()
mbligh36768f02008-02-22 18:28:33 +00001211
1212
1213 def create_results_dir(self, queue_entry=None):
1214 print "create: active: %s complete %s" % (self.num_active(),
1215 self.num_complete())
1216
1217 if not os.path.exists(self.job_dir):
1218 os.makedirs(self.job_dir)
1219
1220 if queue_entry:
1221 return queue_entry.results_dir()
1222 return self.job_dir
1223
1224
1225 def run(self, queue_entry):
1226 results_dir = self.create_results_dir(queue_entry)
1227
1228 if self.is_synchronous():
1229 if not self.is_ready():
mblighd5c95802008-03-05 00:33:46 +00001230 return Agent([VerifySynchronousTask(
1231 queue_entry = queue_entry)],
1232 [queue_entry.id])
mbligh36768f02008-02-22 18:28:33 +00001233
1234 queue_entry.set_status('Starting')
1235
1236 ctrl = open(os.tmpnam(), 'w')
1237 if self.control_file:
1238 ctrl.write(self.control_file)
1239 else:
1240 ctrl.write("")
1241 ctrl.flush()
1242
1243 if self.is_synchronous():
mbligh36768f02008-02-22 18:28:33 +00001244 queue_entries = self.get_host_queue_entries()
1245 else:
1246 assert queue_entry
mbligh36768f02008-02-22 18:28:33 +00001247 queue_entries = [queue_entry]
mblighe2586682008-02-29 22:45:46 +00001248 hostnames = ','.join([entry.get_host().hostname
1249 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00001250
mbligh4314a712008-02-29 22:44:30 +00001251 params = [_autoserv_path, '-n', '-r', results_dir,
mbligh36768f02008-02-22 18:28:33 +00001252 '-b', '-u', self.owner, '-l', self.name,
1253 '-m', hostnames, ctrl.name]
1254
1255 if not self.is_server_job():
1256 params.append('-c')
1257
1258 tasks = []
1259 if not self.is_synchronous():
1260 tasks.append(VerifyTask(queue_entry))
mblighe2586682008-02-29 22:45:46 +00001261
1262 tasks.append(QueueTask(job = self,
1263 queue_entries = queue_entries,
1264 cmd = params))
mbligh36768f02008-02-22 18:28:33 +00001265
mblighd5c95802008-03-05 00:33:46 +00001266 ids = []
1267 for entry in queue_entries:
1268 ids.append(entry.id)
1269
1270 agent = Agent(tasks, ids)
mbligh36768f02008-02-22 18:28:33 +00001271
1272 return agent
1273
1274
1275if __name__ == '__main__':
1276 main()