blob: 61e20cb0f4e59cc224ed9069c3632375fac84a51 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
mblighe2586682008-02-29 22:45:46 +00009import optparse, signal, smtplib, socket, datetime, stat
mblighb090f142008-02-27 21:33:46 +000010from common import global_config
11
mbligh36768f02008-02-22 18:28:33 +000012RESULTS_DIR = '.'
13AUTOSERV_NICE_LEVEL = 10
14
15AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
16
17if os.environ.has_key('AUTOTEST_DIR'):
18 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
19AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
20AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
21
22if AUTOTEST_SERVER_DIR not in sys.path:
23 sys.path.insert(0, AUTOTEST_SERVER_DIR)
24
mbligh6f8bab42008-02-29 22:45:14 +000025_db = None
mbligh36768f02008-02-22 18:28:33 +000026_shutdown = False
27_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000028_autoserv_path = 'autoserv'
29_testing_mode = False
mbligh36768f02008-02-22 18:28:33 +000030
31
32def main():
33 usage = 'usage: %prog [options] results_dir'
34
35 parser = optparse.OptionParser(usage)
36 parser.add_option('--no-recover', help='Skip machine/job recovery ' +
37 'step [for multiple monitors/rolling upgrades]',
38 action='store_true')
39 parser.add_option('--logfile', help='Set a log file that all stdout ' +
40 'should be redirected to. Stderr will go to this ' +
41 'file + ".err"')
42 parser.add_option('--notify', help='Set an email address to be ' +
43 'notified of exceptions')
mbligh4314a712008-02-29 22:44:30 +000044 parser.add_option('--test', help='Indicate that scheduler is under ' +
45 'test and should use dummy autoserv and no parsing',
46 action='store_true')
mbligh36768f02008-02-22 18:28:33 +000047 (options, args) = parser.parse_args()
48 if len(args) != 1:
49 parser.print_usage()
50 return
51
52 global RESULTS_DIR
53 RESULTS_DIR = args[0]
54
55 global _notify_email
56 _notify_email = options.notify
mbligh4314a712008-02-29 22:44:30 +000057
58 if options.test:
59 global _autoserv_path
60 _autoserv_path = 'autoserv_dummy'
61 global _testing_mode
62 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000063
64 init(options.logfile)
65 dispatcher = Dispatcher(do_recover = not options.no_recover)
66
67 try:
68 while not _shutdown:
69 dispatcher.tick()
70 time.sleep(20)
71 dispatcher.shut_down()
72 except:
73 log_stacktrace("Uncaught exception; terminating monitor_db")
74
mbligh6f8bab42008-02-29 22:45:14 +000075 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +000076
77
78def handle_sigint(signum, frame):
79 global _shutdown
80 _shutdown = True
81 print "Shutdown request received."
82
83
84def init(logfile):
85 if logfile:
86 enable_logging(logfile)
87 print "%s> dispatcher starting" % time.strftime("%X %x")
88 print "My PID is %d" % os.getpid()
89
90 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
mbligh6f8bab42008-02-29 22:45:14 +000091 global _db
92 _db = DatabaseConn()
mbligh36768f02008-02-22 18:28:33 +000093
94 print "Setting signal handler"
95 signal.signal(signal.SIGINT, handle_sigint)
96
97 print "Connected! Running..."
98
99
100def enable_logging(logfile):
101 out_file = logfile
102 err_file = "%s.err" % logfile
103 print "Enabling logging to %s (%s)" % (out_file, err_file)
104 out_fd = open(out_file, "a", buffering=0)
105 err_fd = open(err_file, "a", buffering=0)
106
107 os.dup2(out_fd.fileno(), sys.stdout.fileno())
108 os.dup2(err_fd.fileno(), sys.stderr.fileno())
109
110 sys.stdout = out_fd
111 sys.stderr = err_fd
112
113
114def idle_hosts():
mbligh6f8bab42008-02-29 22:45:14 +0000115 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000116 SELECT * FROM hosts h WHERE
117 id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
118 (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
119 OR
120 (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
121 INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
122 )
123 AND locked=false AND (h.status IS null OR h.status='Ready') """)
mbligh6f8bab42008-02-29 22:45:14 +0000124 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000125 return hosts
126
127
mblighe2586682008-02-29 22:45:46 +0000128def remove_file_or_dir(path):
129 if stat.S_ISDIR(os.stat(path).st_mode):
130 # directory
131 shutil.rmtree(path)
132 else:
133 # file
134 os.remove(path)
135
136
mbligh6f8bab42008-02-29 22:45:14 +0000137class DatabaseConn:
138 def __init__(self):
139 self.reconnect_wait = 20
140 self.conn = None
141 self.cur = None
mbligh36768f02008-02-22 18:28:33 +0000142
mbligh6f8bab42008-02-29 22:45:14 +0000143 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000144
145
mbligh6f8bab42008-02-29 22:45:14 +0000146 def connect(self):
147 self.disconnect()
148
149 # get global config and parse for info
150 c = global_config.global_config
151 dbase = "AUTOTEST_WEB"
152 DB_HOST = c.get_config_value(dbase, "host", "localhost")
153 DB_SCHEMA = c.get_config_value(dbase, "database",
154 "autotest_web")
155
156 global _testing_mode
157 if _testing_mode:
158 DB_SCHEMA = 'stresstest_autotest_web'
159
160 DB_USER = c.get_config_value(dbase, "user", "autotest")
161 DB_PASS = c.get_config_value(dbase, "password", "google")
162
163 while not self.conn:
164 try:
165 self.conn = MySQLdb.connect(host=DB_HOST,
166 user=DB_USER,
167 passwd=DB_PASS,
168 db=DB_SCHEMA)
169
170 self.conn.autocommit(True)
171 self.cur = self.conn.cursor()
172 except MySQLdb.OperationalError:
173 #traceback.print_exc()
174 print "Can't connect to MYSQL; reconnecting"
175 time.sleep(self.reconnect_wait)
176 self.disconnect()
177
178
179 def disconnect(self):
180 if self.conn:
181 self.conn.close()
182 self.conn = None
183 self.cur = None
184
185
186 def execute(self, *args, **dargs):
187 while (True):
188 try:
189 self.cur.execute(*args, **dargs)
190 return self.cur.fetchall()
191 except MySQLdb.OperationalError:
192 print "MYSQL connection died; reconnecting"
193 time.sleep(self.reconnect_wait)
194 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000195
196
197def parse_results(results_dir, flags=""):
mbligh4314a712008-02-29 22:44:30 +0000198 if _testing_mode:
199 return
mbligh36768f02008-02-22 18:28:33 +0000200 parse = os.path.join(AUTOTEST_TKO_DIR, 'parse')
201 output = os.path.join(results_dir, '.parse.log')
202 os.system("%s %s -r -o %s > %s 2>&1 &" % (parse, flags, results_dir, output))
203
204
205def log_stacktrace(reason):
206 (type, value, tb) = sys.exc_info()
207 str = "EXCEPTION: %s\n" % reason
208 str += "%s / %s / %s\n" % (socket.gethostname(), os.getpid(),
209 time.strftime("%X %x"))
210 str += ''.join(traceback.format_exception(type, value, tb))
211
212 sys.stderr.write("\n%s\n" % str)
213
214 if _notify_email:
215 sender = "monitor_db"
216 subject = "monitor_db exception"
217 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
218 sender, _notify_email, subject, str)
219 mailer = smtplib.SMTP('localhost')
220 mailer.sendmail(sender, _notify_email, msg)
221 mailer.quit()
222
223
224class Dispatcher:
225 def __init__(self, do_recover=True):
226 self._agents = []
227 self.shutting_down = False
228
229 if do_recover:
230 self._recover_lost()
231
232
233 def shut_down(self):
234 print "Shutting down!"
235 self.shutting_down = True
236 while self._agents:
237 self.tick()
238 time.sleep(40)
239
240
241 def tick(self):
242 if not self.shutting_down:
243 self._find_more_work()
244 self._handle_agents()
245
246
247 def add_agent(self, agent):
248 self._agents.append(agent)
249 agent.dispatcher = self
250
251
252 def _recover_lost(self):
mbligh6f8bab42008-02-29 22:45:14 +0000253 rows = _db.execute("""SELECT * FROM host_queue_entries WHERE active AND NOT complete""")
254 if len(rows) > 0:
255 queue_entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000256 for queue_entry in queue_entries:
257 job = queue_entry.job
258 if job.is_synchronous():
259 for child_entry in job.get_host_queue_entries():
260 child_entry.requeue()
261 else:
262 queue_entry.requeue()
263 queue_entry.clear_results_dir()
264
mbligh6f8bab42008-02-29 22:45:14 +0000265 rows = _db.execute("""SELECT * FROM hosts
mbligh36768f02008-02-22 18:28:33 +0000266 WHERE status != 'Ready' AND NOT locked""")
mbligh6f8bab42008-02-29 22:45:14 +0000267 if len(rows) > 0:
268 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000269 for host in hosts:
270 verify_task = VerifyTask(host = host)
271 self.add_agent(Agent(tasks = [verify_task]))
272
273
274 def _find_more_work(self):
275 print "finding work"
276
277 num_started = 0
278 for host in idle_hosts():
279 tasks = host.next_queue_entries()
280 if tasks:
281 for next in tasks:
282 try:
283 agent = next.run(assigned_host=host)
284 if agent:
285 self.add_agent(agent)
286
287 num_started += 1
288 if num_started>=100:
289 return
290 break
291 except:
292 next.set_status('Failed')
293
294# if next.host:
295# next.host.set_status('Ready')
296
297 log_stacktrace("task_id = %d" % next.id)
298
299
300 def _handle_agents(self):
301 still_running = []
302 for agent in self._agents:
303 agent.tick()
304 if not agent.is_done():
305 still_running.append(agent)
306 else:
307 print "agent finished"
308 self._agents = still_running
309
310
311class RunMonitor(object):
312 def __init__(self, cmd, nice_level = None, log_file = None):
313 self.nice_level = nice_level
314 self.log_file = log_file
315 self.proc = self.run(cmd)
316
317 def run(self, cmd):
318 if self.nice_level:
319 nice_cmd = ['nice','-n', str(self.nice_level)]
320 nice_cmd.extend(cmd)
321 cmd = nice_cmd
322
323 out_file = None
324 if self.log_file:
325 try:
326 out_file = open(self.log_file, 'a')
327 out_file.write("\n%s\n" % ('*'*80))
328 out_file.write("%s> %s\n" % (time.strftime("%X %x"), cmd))
329 out_file.write("%s\n" % ('*'*80))
330 except:
331 pass
332
333 if not out_file:
334 out_file = open('/dev/null', 'w')
335
336 in_devnull = open('/dev/null', 'r')
337 print "cmd = %s" % cmd
338 print "path = %s" % os.getcwd()
339
340 proc = subprocess.Popen(cmd, stdout=out_file,
341 stderr=subprocess.STDOUT, stdin=in_devnull)
342 out_file.close()
343 in_devnull.close()
344 return proc
345
346
347 def kill(self):
348 self.proc.kill()
349
350
351 def exit_code(self):
352 return self.proc.poll()
353
354
355class Agent(object):
356 def __init__(self, tasks):
357 self.active_task = None
358 self.queue = Queue.Queue(0)
359 self.dispatcher = None
360
361 for task in tasks:
362 self.add_task(task)
363
364
365 def add_task(self, task):
366 self.queue.put_nowait(task)
367 task.agent = self
368
369
370 def tick(self):
371 print "agent tick"
372 if self.active_task and not self.active_task.is_done():
373 self.active_task.poll()
374 else:
375 self._next_task();
376
377
378 def _next_task(self):
379 print "agent picking task"
380 if self.active_task:
381 assert self.active_task.is_done()
382
mblighe2586682008-02-29 22:45:46 +0000383 if not self.active_task.success:
384 self.on_task_failure()
385
mbligh36768f02008-02-22 18:28:33 +0000386 self.active_task = None
387 if not self.is_done():
388 self.active_task = self.queue.get_nowait()
389 if self.active_task:
390 self.active_task.start()
391
392
mblighe2586682008-02-29 22:45:46 +0000393 def on_task_failure(self):
394 old_queue = self.queue
395 self.queue = Queue.Queue(0)
396 for task in self.active_task.failure_tasks:
397 self.add_task(task)
398 if not self.active_task.clear_queue_on_failure:
399 while not old_queue.empty():
400 self.add_task(old_queue.get_nowait())
401
mbligh36768f02008-02-22 18:28:33 +0000402 def is_done(self):
403 return self.active_task == None and self.queue.empty()
404
405
406 def start(self):
407 assert self.dispatcher
408
409 self._next_task()
410
411
412class AgentTask(object):
mblighe2586682008-02-29 22:45:46 +0000413 def __init__(self, cmd, failure_tasks = [],
414 clear_queue_on_failure=True):
415 """\
416 By default, on failure, the Agent's task queue is cleared and
417 replaced with the tasks in failure_tasks. If
418 clear_queue_on_failure=False, the task queue will not be
419 cleared, and the tasks in failure_tasks will be inserted at the
420 beginning of the queue.
421 """
mbligh36768f02008-02-22 18:28:33 +0000422 self.done = False
423 self.failure_tasks = failure_tasks
mblighe2586682008-02-29 22:45:46 +0000424 self.clear_queue_on_failure = clear_queue_on_failure
mbligh36768f02008-02-22 18:28:33 +0000425 self.started = False
426 self.cmd = cmd
427 self.agent = None
428
429
430 def poll(self):
431 print "poll"
432 if hasattr(self, 'monitor'):
433 self.tick(self.monitor.exit_code())
434 else:
435 self.finished(False)
436
437
438 def tick(self, exit_code):
439 if exit_code==None:
440 return
441# print "exit_code was %d" % exit_code
442 if exit_code == 0:
443 success = True
444 else:
445 success = False
446
447 self.finished(success)
448
449
450 def is_done(self):
451 return self.done
452
453
454 def finished(self, success):
455 self.done = True
456 self.success = success
457 self.epilog()
458
459
460 def prolog(self):
461 pass
462
463
464 def epilog(self):
465 pass
466
467
468 def start(self):
469 assert self.agent
470
471 if not self.started:
472 self.prolog()
473 self.run()
474
475 self.started = True
476
477
478 def abort(self):
479 self.monitor.kill()
480
481
482 def run(self):
483 if self.cmd:
484 print "agent starting monitor"
485
486 log_file = None
487 if hasattr(self, 'host'):
488 log_file = os.path.join(os.path.join(RESULTS_DIR, 'hosts'), self.host.hostname)
489
490 self.monitor = RunMonitor(self.cmd, nice_level = AUTOSERV_NICE_LEVEL, log_file = log_file)
491
492
493class RepairTask(AgentTask):
494 def __init__(self, host):
mbligh48c10a52008-02-29 22:46:38 +0000495 cmd = [_autoserv_path , '-R', '-m', host.hostname]
mbligh36768f02008-02-22 18:28:33 +0000496 self.host = host
mblighe2586682008-02-29 22:45:46 +0000497 AgentTask.__init__(self, cmd, clear_queue_on_failure=False)
498
mbligh36768f02008-02-22 18:28:33 +0000499
500 def prolog(self):
501 print "repair_task starting"
502 self.host.set_status('Repairing')
503
504
505 def epilog(self):
506 if self.success:
mblighe2586682008-02-29 22:45:46 +0000507 status = 'Repair Succeeded'
mbligh36768f02008-02-22 18:28:33 +0000508 else:
509 status = 'Repair Failed'
510
511 self.host.set_status(status)
512
513
514class VerifyTask(AgentTask):
515 def __init__(self, queue_entry=None, host=None):
516 assert bool(queue_entry) != bool(host)
517
518 self.host = host or queue_entry.host
519 self.queue_entry = queue_entry
520
521 self.temp_results_dir = tempfile.mkdtemp(suffix='.verify')
mbligh48c10a52008-02-29 22:46:38 +0000522 cmd = [_autoserv_path,'-v','-m',self.host.hostname,
mbligh36768f02008-02-22 18:28:33 +0000523 '-r', self.temp_results_dir]
524
mblighe2586682008-02-29 22:45:46 +0000525 failure_tasks = self.get_failure_tasks()
526
mblighdffd6372008-02-29 22:47:33 +0000527 AgentTask.__init__(self, cmd, failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +0000528
529
530 def get_failure_tasks(self):
531 'To be overridden'
532 return [RepairTask(self.host),
533 ReverifyTask(self.queue_entry, self.host)]
mbligh36768f02008-02-22 18:28:33 +0000534
535
536 def prolog(self):
537 print "starting verify on %s" % (self.host.hostname)
538 if self.queue_entry:
539 self.queue_entry.set_status('Verifying')
mblighdffd6372008-02-29 22:47:33 +0000540 self.queue_entry.clear_results_dir(
541 self.queue_entry.verify_results_dir())
mbligh36768f02008-02-22 18:28:33 +0000542 self.host.set_status('Verifying')
543
544
545 def epilog(self):
546 if self.queue_entry and (self.success or
547 not self.queue_entry.meta_host):
548 self.move_results()
mblighe2586682008-02-29 22:45:46 +0000549 shutil.rmtree(self.temp_results_dir)
mbligh36768f02008-02-22 18:28:33 +0000550
551 if self.success:
mblighe2586682008-02-29 22:45:46 +0000552 self.on_success()
mbligh36768f02008-02-22 18:28:33 +0000553 else:
mblighe2586682008-02-29 22:45:46 +0000554 self.on_failure()
mbligh36768f02008-02-22 18:28:33 +0000555
mblighe2586682008-02-29 22:45:46 +0000556
557 def on_success(self):
558 self.host.set_status('Ready')
559
560
561 def on_failure(self):
562 self.host.set_status('Failed Verify')
mblighe2586682008-02-29 22:45:46 +0000563 if self.queue_entry:
mblighdffd6372008-02-29 22:47:33 +0000564 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +0000565
566
567 def move_results(self):
568 assert self.queue_entry is not None
mblighe2586682008-02-29 22:45:46 +0000569 target_dir = self.queue_entry.verify_results_dir()
mbligh36768f02008-02-22 18:28:33 +0000570 if not os.path.exists(target_dir):
571 os.makedirs(target_dir)
572 files = os.listdir(self.temp_results_dir)
573 for filename in files:
mblighe2586682008-02-29 22:45:46 +0000574 self.force_move(os.path.join(self.temp_results_dir,
575 filename),
576 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +0000577
578
mblighe2586682008-02-29 22:45:46 +0000579 @staticmethod
580 def force_move(source, dest):
581 """\
582 Replacement for shutil.move() that will delete the destination
583 if it exists, even if it's a directory.
584 """
585 if os.path.exists(dest):
586 print ('Warning: removing existing destination file ' +
587 dest)
588 remove_file_or_dir(dest)
589 shutil.move(source, dest)
590
591
592class ReverifyTask(VerifyTask):
593 def __init__(self, queue_entry=None, host=None):
mblighdffd6372008-02-29 22:47:33 +0000594 self.fail_queue_entry = None
mblighe2586682008-02-29 22:45:46 +0000595 if queue_entry:
mblighdffd6372008-02-29 22:47:33 +0000596 if not host:
597 host = queue_entry.host
598 if not queue_entry.meta_host:
599 self.fail_queue_entry = queue_entry
600 # always construct VerifyTask without the queue_entry - we don't
601 # want to touch the queue entry unless we fail, in which case we
602 # just fail it (and only if it's a non-metahost)
603 VerifyTask.__init__(self, host=host)
mblighe2586682008-02-29 22:45:46 +0000604
605
606 def get_failure_tasks(self):
607 return []
608
609
mblighe2586682008-02-29 22:45:46 +0000610 def on_failure(self):
611 self.host.set_status('Repair Failed')
mblighdffd6372008-02-29 22:47:33 +0000612 if self.fail_queue_entry:
613 self.fail_queue_entry.handle_host_failure()
mblighe2586682008-02-29 22:45:46 +0000614
615
mblighdffd6372008-02-29 22:47:33 +0000616class VerifySynchronousTask(VerifyTask):
617 def __init__(self, queue_entry):
618 VerifyTask.__init__(self, queue_entry = queue_entry)
619
620
621 def on_success(self):
622 VerifyTask.on_success(self)
623 self.on_pending()
624
625
mblighe2586682008-02-29 22:45:46 +0000626 def on_pending(self):
627 if self.queue_entry.job.num_complete() > 0:
628 # some other entry failed verify, and we've
629 # already been marked as stopped
630 return
631
632 self.queue_entry.set_status('Pending')
633 job = self.queue_entry.job
634 if job.is_ready():
635 agent = job.run(self.queue_entry)
636 self.agent.dispatcher.add_agent(agent)
637
638
mbligh36768f02008-02-22 18:28:33 +0000639class QueueTask(AgentTask):
640 def __init__(self, job, queue_entries, cmd):
641 AgentTask.__init__(self, cmd)
642 self.job = job
643 self.queue_entries = queue_entries
644
645
mbligh4314a712008-02-29 22:44:30 +0000646 @staticmethod
647 def _write_keyval(queue_entry, field, value):
mbligh36768f02008-02-22 18:28:33 +0000648 key_path = os.path.join(queue_entry.results_dir(), 'keyval')
649 keyval_file = open(key_path, 'a')
650 print >> keyval_file, '%s=%d' % (field, value)
651 keyval_file.close()
652
653
654 def prolog(self):
mblighe2586682008-02-29 22:45:46 +0000655 # write some job timestamps into the job keyval file
656 queued = time.mktime(self.job.created_on.timetuple())
657 started = time.time()
658 self._write_keyval(self.queue_entries[0], "job_queued", queued)
659 self._write_keyval(self.queue_entries[0], "job_started",
660 started)
mbligh36768f02008-02-22 18:28:33 +0000661 for queue_entry in self.queue_entries:
662 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
663 queue_entry.set_status('Running')
664 queue_entry.host.set_status('Running')
mblighe2586682008-02-29 22:45:46 +0000665 if (not self.job.is_synchronous() and
666 self.job.num_machines() > 1):
667 assert len(self.queue_entries) == 1
668 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +0000669
670
671 def epilog(self):
672 if self.success:
673 status = 'Completed'
674 else:
675 status = 'Failed'
676
mblighe2586682008-02-29 22:45:46 +0000677 # write another timestamp into the job keyval file
678 finished = time.time()
679 self._write_keyval(self.queue_entries[0], "job_finished",
680 finished)
mbligh36768f02008-02-22 18:28:33 +0000681 for queue_entry in self.queue_entries:
682 queue_entry.set_status(status)
683 queue_entry.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +0000684
685 if self.job.is_synchronous() or self.job.num_machines()==1:
686 if self.job.is_finished():
687 parse_results(self.job.results_dir())
688 else:
689 for queue_entry in self.queue_entries:
690 parse_results(queue_entry.results_dir(), flags='-l 2')
691
692 print "queue_task finished with %s/%s" % (status, self.success)
693
694
695class RebootTask(AgentTask):
696 def __init__(self):
697 AgentTask.__init__(self, host)
mbligh48c10a52008-02-29 22:46:38 +0000698 self.cmd = "%s -b -m %s /dev/null" % (_autoserv_path, host)
mbligh36768f02008-02-22 18:28:33 +0000699 self.host = host
700
701
702 def tick(self, exit_code):
703 raise "not implemented"
704
705
706 def run(self):
707 raise "not implemented"
708
709
710
711class DBObject(object):
mblighe2586682008-02-29 22:45:46 +0000712 def __init__(self, fields, id=None, row=None, new_record=False):
713 assert (bool(id) != bool(row)) and fields
mbligh36768f02008-02-22 18:28:33 +0000714
mblighe2586682008-02-29 22:45:46 +0000715 self.__table = self._get_table()
mbligh36768f02008-02-22 18:28:33 +0000716 self.__fields = fields
717
718 self.__new_record = new_record
719
720 if row is None:
721 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
mbligh6f8bab42008-02-29 22:45:14 +0000722 rows = _db.execute(sql, (id,))
723 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +0000724 raise "row not found (table=%s, id=%s)" % \
725 (self.__table, id)
mbligh6f8bab42008-02-29 22:45:14 +0000726 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +0000727
mblighe2586682008-02-29 22:45:46 +0000728 assert len(row)==len(fields), (
729 "table = %s, row = %s/%d, fields = %s/%d" % (
730 self.__table, row, len(row), fields, len(fields)))
mbligh36768f02008-02-22 18:28:33 +0000731
732 self.__valid_fields = {}
733 for i,value in enumerate(row):
734 self.__dict__[fields[i]] = value
735 self.__valid_fields[fields[i]] = True
736
737 del self.__valid_fields['id']
738
mblighe2586682008-02-29 22:45:46 +0000739
740 @classmethod
741 def _get_table(cls):
742 raise NotImplementedError('Subclasses must override this')
743
744
mbligh36768f02008-02-22 18:28:33 +0000745 def count(self, where, table = None):
746 if not table:
747 table = self.__table
mbligh4314a712008-02-29 22:44:30 +0000748
mbligh6f8bab42008-02-29 22:45:14 +0000749 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000750 SELECT count(*) FROM %s
751 WHERE %s
752 """ % (table, where))
mbligh36768f02008-02-22 18:28:33 +0000753
mbligh6f8bab42008-02-29 22:45:14 +0000754 assert len(rows) == 1
755
756 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +0000757
758
759 def num_cols(self):
760 return len(self.__fields)
761
762
763 def update_field(self, field, value):
764 assert self.__valid_fields[field]
765
766 if self.__dict__[field] == value:
767 return
768
769 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
770 (self.__table, field)
mbligh6f8bab42008-02-29 22:45:14 +0000771 _db.execute(query, (value, self.id))
mbligh36768f02008-02-22 18:28:33 +0000772
773 self.__dict__[field] = value
774
775
776 def save(self):
777 if self.__new_record:
778 keys = self.__fields[1:] # avoid id
779 columns = ','.join([str(key) for key in keys])
780 values = ['"%s"' % self.__dict__[key] for key in keys]
781 values = ','.join(values)
782 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
783 (self.__table, columns, values)
mbligh6f8bab42008-02-29 22:45:14 +0000784 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +0000785
786
mblighe2586682008-02-29 22:45:46 +0000787 def delete(self):
788 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
789 _db.execute(query, (self.id,))
790
791
792 @classmethod
793 def fetch(cls, where):
794 rows = _db.execute(
795 'SELECT * FROM %s WHERE %s' % (cls._get_table(), where))
796 for row in rows:
797 yield cls(row=row)
798
mbligh36768f02008-02-22 18:28:33 +0000799
800class IneligibleHostQueue(DBObject):
801 def __init__(self, id=None, row=None, new_record=None):
802 fields = ['id', 'job_id', 'host_id']
mblighe2586682008-02-29 22:45:46 +0000803 DBObject.__init__(self, fields, id=id, row=row,
804 new_record=new_record)
805
806
807 @classmethod
808 def _get_table(cls):
809 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +0000810
811
812class Host(DBObject):
813 def __init__(self, id=None, row=None):
814 fields = ['id', 'hostname', 'locked', 'synch_id','status']
mblighe2586682008-02-29 22:45:46 +0000815 DBObject.__init__(self, fields, id=id, row=row)
816
817
818 @classmethod
819 def _get_table(cls):
820 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +0000821
822
823 def current_task(self):
mbligh6f8bab42008-02-29 22:45:14 +0000824 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000825 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
826 """, (self.id,))
827
mbligh6f8bab42008-02-29 22:45:14 +0000828 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +0000829 return None
830 else:
mbligh6f8bab42008-02-29 22:45:14 +0000831 assert len(rows) == 1
832 results = rows[0];
mbligh36768f02008-02-22 18:28:33 +0000833# print "current = %s" % results
834 return HostQueueEntry(row=results)
835
836
837 def next_queue_entries(self):
838 if self.locked:
839 print "%s locked, not queuing" % self.hostname
840 return None
841# print "%s/%s looking for work" % (self.hostname, self.platform_id)
mbligh6f8bab42008-02-29 22:45:14 +0000842 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000843 SELECT * FROM host_queue_entries
844 WHERE ((host_id=%s) OR (meta_host IS NOT null AND
845 (meta_host IN (
846 SELECT label_id FROM hosts_labels WHERE host_id=%s
847 )
848 )
849 AND job_id NOT IN (
850 SELECT job_id FROM ineligible_host_queues
851 WHERE host_id=%s
852 )))
853 AND NOT complete AND NOT active
854 ORDER BY priority DESC, meta_host, id
855 LIMIT 1
856 """, (self.id,self.id, self.id))
857
mbligh6f8bab42008-02-29 22:45:14 +0000858 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +0000859 return None
860 else:
mbligh6f8bab42008-02-29 22:45:14 +0000861 return [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000862
863 def yield_work(self):
864 print "%s yielding work" % self.hostname
865 if self.current_task():
866 self.current_task().requeue()
867
868 def set_status(self,status):
869 self.update_field('status',status)
870
871
872class HostQueueEntry(DBObject):
873 def __init__(self, id=None, row=None):
874 assert id or row
875 fields = ['id', 'job_id', 'host_id', 'priority', 'status',
876 'meta_host', 'active', 'complete']
mblighe2586682008-02-29 22:45:46 +0000877 DBObject.__init__(self, fields, id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +0000878
879 self.job = Job(self.job_id)
880
881 if self.host_id:
882 self.host = Host(self.host_id)
883 else:
884 self.host = None
885
886 self.queue_log_path = os.path.join(self.job.results_dir(),
887 'queue.log.' + str(self.id))
888
889
mblighe2586682008-02-29 22:45:46 +0000890 @classmethod
891 def _get_table(cls):
892 return 'host_queue_entries'
893
894
mbligh36768f02008-02-22 18:28:33 +0000895 def set_host(self, host):
896 if host:
897 self.queue_log_record('Assigning host ' + host.hostname)
898 self.update_field('host_id', host.id)
899 self.update_field('active', True)
mblighe2586682008-02-29 22:45:46 +0000900 self.block_host(host.id)
mbligh36768f02008-02-22 18:28:33 +0000901 else:
902 self.queue_log_record('Releasing host')
mblighe2586682008-02-29 22:45:46 +0000903 self.unblock_host(self.host.id)
mbligh36768f02008-02-22 18:28:33 +0000904 self.update_field('host_id', None)
905
906 self.host = host
907
908
909 def get_host(self):
mblighe2586682008-02-29 22:45:46 +0000910 return self.host
mbligh36768f02008-02-22 18:28:33 +0000911
912
913 def queue_log_record(self, log_line):
mblighe2586682008-02-29 22:45:46 +0000914 now = str(datetime.datetime.now())
mbligh36768f02008-02-22 18:28:33 +0000915 queue_log = open(self.queue_log_path, 'a', 0)
mblighe2586682008-02-29 22:45:46 +0000916 queue_log.write(now + ' ' + log_line + '\n')
mbligh36768f02008-02-22 18:28:33 +0000917 queue_log.close()
918
919
mblighe2586682008-02-29 22:45:46 +0000920 def block_host(self, host_id):
921 print "creating block %s/%s" % (self.job.id, host_id)
922 row = [0, self.job.id, host_id]
923 block = IneligibleHostQueue(row=row, new_record=True)
924 block.save()
925
926
927 def unblock_host(self, host_id):
928 print "removing block %s/%s" % (self.job.id, host_id)
929 blocks = list(IneligibleHostQueue.fetch(
930 'job_id=%d and host_id=%d' % (self.job.id, host_id)))
931 assert len(blocks) == 1
932 blocks[0].delete()
933
934
mbligh36768f02008-02-22 18:28:33 +0000935 def results_dir(self):
mblighe2586682008-02-29 22:45:46 +0000936 if self.job.is_synchronous() or self.job.num_machines() == 1:
937 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +0000938 else:
939 assert self.host
mblighe2586682008-02-29 22:45:46 +0000940 return os.path.join(self.job.job_dir,
941 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +0000942
mblighe2586682008-02-29 22:45:46 +0000943
944 def verify_results_dir(self):
945 if self.job.is_synchronous() or self.job.num_machines() > 1:
946 assert self.host
947 return os.path.join(self.job.job_dir,
948 self.host.hostname)
949 else:
950 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +0000951
952
953 def set_status(self, status):
954 self.update_field('status', status)
955 if self.host:
956 hostname = self.host.hostname
957 else:
958 hostname = 'no host'
959 print "%s/%d status -> %s" % (hostname, self.id, self.status)
960 if status in ['Queued']:
961 self.update_field('complete', False)
962 self.update_field('active', False)
963
964 if status in ['Pending', 'Running', 'Verifying', 'Starting']:
965 self.update_field('complete', False)
966 self.update_field('active', True)
967
968 if status in ['Failed', 'Completed', 'Stopped']:
969 self.update_field('complete', True)
970 self.update_field('active', False)
971
972
973 def run(self,assigned_host=None):
974 if self.meta_host:
975 assert assigned_host
mblighe2586682008-02-29 22:45:46 +0000976 # ensure results dir exists for the queue log
mbligh36768f02008-02-22 18:28:33 +0000977 self.job.create_results_dir()
978 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +0000979
mbligh36768f02008-02-22 18:28:33 +0000980 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
981 self.meta_host, self.host.hostname, self.status)
982
983 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +0000984
mbligh36768f02008-02-22 18:28:33 +0000985 def requeue(self):
986 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +0000987
mbligh36768f02008-02-22 18:28:33 +0000988 if self.meta_host:
989 self.set_host(None)
990
991
mblighe2586682008-02-29 22:45:46 +0000992 def handle_host_failure(self):
993 """\
994 Called when this queue entry's host has failed verification and
995 repair.
996 """
mblighdffd6372008-02-29 22:47:33 +0000997 assert not self.meta_host
998 self.set_status('Failed')
999 if self.job.is_synchronous():
1000 self.job.stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00001001
1002
1003 def clear_results_dir(self, results_dir=None):
1004 results_dir = results_dir or self.results_dir()
1005 if not os.path.exists(results_dir):
1006 return
1007 for filename in os.listdir(results_dir):
1008 if 'queue.log' in filename:
1009 continue
1010 path = os.path.join(results_dir, filename)
1011 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00001012
1013
1014class Job(DBObject):
1015 def __init__(self, id=None, row=None):
1016 assert id or row
mblighe2586682008-02-29 22:45:46 +00001017 DBObject.__init__(self,
1018 ['id','owner','name','priority',
1019 'control_file','control_type','created_on',
1020 'synch_type', 'synch_count','synchronizing'],
1021 id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001022
mblighe2586682008-02-29 22:45:46 +00001023 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
1024 self.owner))
1025
1026
1027 @classmethod
1028 def _get_table(cls):
1029 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001030
1031
1032 def is_server_job(self):
1033 return self.control_type != 2
1034
1035
1036 def get_host_queue_entries(self):
mbligh6f8bab42008-02-29 22:45:14 +00001037 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001038 SELECT * FROM host_queue_entries
1039 WHERE job_id= %s
1040 """, (self.id,))
mbligh6f8bab42008-02-29 22:45:14 +00001041 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001042
1043 assert len(entries)>0
1044
1045 return entries
1046
1047
1048 def set_status(self, status, update_queues=False):
1049 self.update_field('status',status)
1050
1051 if update_queues:
1052 for queue_entry in self.get_host_queue_entries():
1053 queue_entry.set_status(status)
1054
1055
1056 def is_synchronous(self):
1057 return self.synch_type == 2
1058
1059
1060 def is_ready(self):
1061 if not self.is_synchronous():
1062 return True
1063 sql = "job_id=%s AND status='Pending'" % self.id
1064 count = self.count(sql, table='host_queue_entries')
1065 return (count == self.synch_count)
1066
1067
1068 def ready_to_synchronize(self):
1069 # heuristic
1070 queue_entries = self.get_host_queue_entries()
1071 count = 0
1072 for queue_entry in queue_entries:
1073 if queue_entry.status == 'Pending':
1074 count += 1
1075
1076 return (count/self.synch_count >= 0.5)
1077
1078
1079 def start_synchronizing(self):
1080 self.update_field('synchronizing', True)
1081
1082
1083 def results_dir(self):
1084 return self.job_dir
1085
1086 def num_machines(self, clause = None):
1087 sql = "job_id=%s" % self.id
1088 if clause:
1089 sql += " AND (%s)" % clause
1090 return self.count(sql, table='host_queue_entries')
1091
1092
1093 def num_queued(self):
1094 return self.num_machines('not complete')
1095
1096
1097 def num_active(self):
1098 return self.num_machines('active')
1099
1100
1101 def num_complete(self):
1102 return self.num_machines('complete')
1103
1104
1105 def is_finished(self):
1106 left = self.num_queued()
1107 print "%s: %s machines left" % (self.name, left)
1108 return left==0
1109
1110 def stop_synchronizing(self):
1111 self.update_field('synchronizing', False)
1112 self.set_status('Queued', update_queues = False)
1113
1114
mblighe2586682008-02-29 22:45:46 +00001115 def stop_all_entries(self):
1116 for child_entry in self.get_host_queue_entries():
1117 if not child_entry.complete:
1118 child_entry.set_status('Stopped')
1119
1120
1121 def write_to_machines_file(self, queue_entry):
1122 hostname = queue_entry.get_host().hostname
1123 print "writing %s to job %s machines file" % (hostname, self.id)
1124 file_path = os.path.join(self.job_dir, '.machines')
1125 mf = open(file_path, 'a')
1126 mf.write("%s\n" % queue_entry.get_host().hostname)
1127 mf.close()
mbligh36768f02008-02-22 18:28:33 +00001128
1129
1130 def create_results_dir(self, queue_entry=None):
1131 print "create: active: %s complete %s" % (self.num_active(),
1132 self.num_complete())
1133
1134 if not os.path.exists(self.job_dir):
1135 os.makedirs(self.job_dir)
1136
1137 if queue_entry:
1138 return queue_entry.results_dir()
1139 return self.job_dir
1140
1141
1142 def run(self, queue_entry):
1143 results_dir = self.create_results_dir(queue_entry)
1144
1145 if self.is_synchronous():
1146 if not self.is_ready():
1147 return Agent([VerifySynchronousTask(queue_entry = queue_entry)])
1148
1149 queue_entry.set_status('Starting')
1150
1151 ctrl = open(os.tmpnam(), 'w')
1152 if self.control_file:
1153 ctrl.write(self.control_file)
1154 else:
1155 ctrl.write("")
1156 ctrl.flush()
1157
1158 if self.is_synchronous():
mbligh36768f02008-02-22 18:28:33 +00001159 queue_entries = self.get_host_queue_entries()
1160 else:
1161 assert queue_entry
mbligh36768f02008-02-22 18:28:33 +00001162 queue_entries = [queue_entry]
mblighe2586682008-02-29 22:45:46 +00001163 hostnames = ','.join([entry.get_host().hostname
1164 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00001165
mbligh4314a712008-02-29 22:44:30 +00001166 params = [_autoserv_path, '-n', '-r', results_dir,
mbligh36768f02008-02-22 18:28:33 +00001167 '-b', '-u', self.owner, '-l', self.name,
1168 '-m', hostnames, ctrl.name]
1169
1170 if not self.is_server_job():
1171 params.append('-c')
1172
1173 tasks = []
1174 if not self.is_synchronous():
1175 tasks.append(VerifyTask(queue_entry))
mblighe2586682008-02-29 22:45:46 +00001176
1177 tasks.append(QueueTask(job = self,
1178 queue_entries = queue_entries,
1179 cmd = params))
mbligh36768f02008-02-22 18:28:33 +00001180
1181 agent = Agent(tasks)
1182
1183 return agent
1184
1185
1186if __name__ == '__main__':
1187 main()