blob: e48052ed3c373577dfbd666e0f4480cccb9bbe63 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
mblighe2586682008-02-29 22:45:46 +00009import optparse, signal, smtplib, socket, datetime, stat
mblighb090f142008-02-27 21:33:46 +000010from common import global_config
11
mbligh36768f02008-02-22 18:28:33 +000012RESULTS_DIR = '.'
13AUTOSERV_NICE_LEVEL = 10
14
15AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
16
17if os.environ.has_key('AUTOTEST_DIR'):
18 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
19AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
20AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
21
22if AUTOTEST_SERVER_DIR not in sys.path:
23 sys.path.insert(0, AUTOTEST_SERVER_DIR)
24
mbligh6f8bab42008-02-29 22:45:14 +000025_db = None
mbligh36768f02008-02-22 18:28:33 +000026_shutdown = False
27_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000028_autoserv_path = 'autoserv'
29_testing_mode = False
mbligh36768f02008-02-22 18:28:33 +000030
31
32def main():
33 usage = 'usage: %prog [options] results_dir'
34
35 parser = optparse.OptionParser(usage)
36 parser.add_option('--no-recover', help='Skip machine/job recovery ' +
37 'step [for multiple monitors/rolling upgrades]',
38 action='store_true')
39 parser.add_option('--logfile', help='Set a log file that all stdout ' +
40 'should be redirected to. Stderr will go to this ' +
41 'file + ".err"')
42 parser.add_option('--notify', help='Set an email address to be ' +
43 'notified of exceptions')
mbligh4314a712008-02-29 22:44:30 +000044 parser.add_option('--test', help='Indicate that scheduler is under ' +
45 'test and should use dummy autoserv and no parsing',
46 action='store_true')
mbligh36768f02008-02-22 18:28:33 +000047 (options, args) = parser.parse_args()
48 if len(args) != 1:
49 parser.print_usage()
50 return
51
52 global RESULTS_DIR
53 RESULTS_DIR = args[0]
54
55 global _notify_email
56 _notify_email = options.notify
mbligh4314a712008-02-29 22:44:30 +000057
58 if options.test:
59 global _autoserv_path
60 _autoserv_path = 'autoserv_dummy'
61 global _testing_mode
62 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000063
64 init(options.logfile)
65 dispatcher = Dispatcher(do_recover = not options.no_recover)
66
67 try:
68 while not _shutdown:
69 dispatcher.tick()
70 time.sleep(20)
71 dispatcher.shut_down()
72 except:
73 log_stacktrace("Uncaught exception; terminating monitor_db")
74
mbligh6f8bab42008-02-29 22:45:14 +000075 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +000076
77
78def handle_sigint(signum, frame):
79 global _shutdown
80 _shutdown = True
81 print "Shutdown request received."
82
83
84def init(logfile):
85 if logfile:
86 enable_logging(logfile)
87 print "%s> dispatcher starting" % time.strftime("%X %x")
88 print "My PID is %d" % os.getpid()
89
90 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
mbligh6f8bab42008-02-29 22:45:14 +000091 global _db
92 _db = DatabaseConn()
mbligh36768f02008-02-22 18:28:33 +000093
94 print "Setting signal handler"
95 signal.signal(signal.SIGINT, handle_sigint)
96
97 print "Connected! Running..."
98
99
100def enable_logging(logfile):
101 out_file = logfile
102 err_file = "%s.err" % logfile
103 print "Enabling logging to %s (%s)" % (out_file, err_file)
104 out_fd = open(out_file, "a", buffering=0)
105 err_fd = open(err_file, "a", buffering=0)
106
107 os.dup2(out_fd.fileno(), sys.stdout.fileno())
108 os.dup2(err_fd.fileno(), sys.stderr.fileno())
109
110 sys.stdout = out_fd
111 sys.stderr = err_fd
112
113
114def idle_hosts():
mbligh6f8bab42008-02-29 22:45:14 +0000115 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000116 SELECT * FROM hosts h WHERE
117 id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
118 (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
119 OR
120 (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
121 INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
122 )
123 AND locked=false AND (h.status IS null OR h.status='Ready') """)
mbligh6f8bab42008-02-29 22:45:14 +0000124 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000125 return hosts
126
127
mblighe2586682008-02-29 22:45:46 +0000128def remove_file_or_dir(path):
129 if stat.S_ISDIR(os.stat(path).st_mode):
130 # directory
131 shutil.rmtree(path)
132 else:
133 # file
134 os.remove(path)
135
136
mbligh6f8bab42008-02-29 22:45:14 +0000137class DatabaseConn:
138 def __init__(self):
139 self.reconnect_wait = 20
140 self.conn = None
141 self.cur = None
mbligh36768f02008-02-22 18:28:33 +0000142
mbligh6f8bab42008-02-29 22:45:14 +0000143 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000144
145
mbligh6f8bab42008-02-29 22:45:14 +0000146 def connect(self):
147 self.disconnect()
148
149 # get global config and parse for info
150 c = global_config.global_config
151 dbase = "AUTOTEST_WEB"
152 DB_HOST = c.get_config_value(dbase, "host", "localhost")
153 DB_SCHEMA = c.get_config_value(dbase, "database",
154 "autotest_web")
155
156 global _testing_mode
157 if _testing_mode:
158 DB_SCHEMA = 'stresstest_autotest_web'
159
160 DB_USER = c.get_config_value(dbase, "user", "autotest")
161 DB_PASS = c.get_config_value(dbase, "password", "google")
162
163 while not self.conn:
164 try:
165 self.conn = MySQLdb.connect(host=DB_HOST,
166 user=DB_USER,
167 passwd=DB_PASS,
168 db=DB_SCHEMA)
169
170 self.conn.autocommit(True)
171 self.cur = self.conn.cursor()
172 except MySQLdb.OperationalError:
173 #traceback.print_exc()
174 print "Can't connect to MYSQL; reconnecting"
175 time.sleep(self.reconnect_wait)
176 self.disconnect()
177
178
179 def disconnect(self):
180 if self.conn:
181 self.conn.close()
182 self.conn = None
183 self.cur = None
184
185
186 def execute(self, *args, **dargs):
187 while (True):
188 try:
189 self.cur.execute(*args, **dargs)
190 return self.cur.fetchall()
191 except MySQLdb.OperationalError:
192 print "MYSQL connection died; reconnecting"
193 time.sleep(self.reconnect_wait)
194 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000195
196
197def parse_results(results_dir, flags=""):
mbligh4314a712008-02-29 22:44:30 +0000198 if _testing_mode:
199 return
mbligh36768f02008-02-22 18:28:33 +0000200 parse = os.path.join(AUTOTEST_TKO_DIR, 'parse')
201 output = os.path.join(results_dir, '.parse.log')
202 os.system("%s %s -r -o %s > %s 2>&1 &" % (parse, flags, results_dir, output))
203
204
205def log_stacktrace(reason):
206 (type, value, tb) = sys.exc_info()
207 str = "EXCEPTION: %s\n" % reason
208 str += "%s / %s / %s\n" % (socket.gethostname(), os.getpid(),
209 time.strftime("%X %x"))
210 str += ''.join(traceback.format_exception(type, value, tb))
211
212 sys.stderr.write("\n%s\n" % str)
213
214 if _notify_email:
215 sender = "monitor_db"
216 subject = "monitor_db exception"
217 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
218 sender, _notify_email, subject, str)
219 mailer = smtplib.SMTP('localhost')
220 mailer.sendmail(sender, _notify_email, msg)
221 mailer.quit()
222
223
224class Dispatcher:
225 def __init__(self, do_recover=True):
226 self._agents = []
227 self.shutting_down = False
228
229 if do_recover:
230 self._recover_lost()
231
232
233 def shut_down(self):
234 print "Shutting down!"
235 self.shutting_down = True
236 while self._agents:
237 self.tick()
238 time.sleep(40)
239
240
241 def tick(self):
242 if not self.shutting_down:
243 self._find_more_work()
244 self._handle_agents()
245
246
247 def add_agent(self, agent):
248 self._agents.append(agent)
249 agent.dispatcher = self
250
251
252 def _recover_lost(self):
mbligh6f8bab42008-02-29 22:45:14 +0000253 rows = _db.execute("""SELECT * FROM host_queue_entries WHERE active AND NOT complete""")
254 if len(rows) > 0:
255 queue_entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000256 for queue_entry in queue_entries:
257 job = queue_entry.job
258 if job.is_synchronous():
259 for child_entry in job.get_host_queue_entries():
260 child_entry.requeue()
261 else:
262 queue_entry.requeue()
263 queue_entry.clear_results_dir()
264
mbligh6f8bab42008-02-29 22:45:14 +0000265 rows = _db.execute("""SELECT * FROM hosts
mbligh36768f02008-02-22 18:28:33 +0000266 WHERE status != 'Ready' AND NOT locked""")
mbligh6f8bab42008-02-29 22:45:14 +0000267 if len(rows) > 0:
268 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000269 for host in hosts:
270 verify_task = VerifyTask(host = host)
271 self.add_agent(Agent(tasks = [verify_task]))
272
273
274 def _find_more_work(self):
275 print "finding work"
276
277 num_started = 0
278 for host in idle_hosts():
279 tasks = host.next_queue_entries()
280 if tasks:
281 for next in tasks:
282 try:
283 agent = next.run(assigned_host=host)
284 if agent:
285 self.add_agent(agent)
286
287 num_started += 1
288 if num_started>=100:
289 return
290 break
291 except:
292 next.set_status('Failed')
293
294# if next.host:
295# next.host.set_status('Ready')
296
297 log_stacktrace("task_id = %d" % next.id)
298
299
300 def _handle_agents(self):
301 still_running = []
302 for agent in self._agents:
303 agent.tick()
304 if not agent.is_done():
305 still_running.append(agent)
306 else:
307 print "agent finished"
308 self._agents = still_running
309
310
311class RunMonitor(object):
312 def __init__(self, cmd, nice_level = None, log_file = None):
313 self.nice_level = nice_level
314 self.log_file = log_file
315 self.proc = self.run(cmd)
316
317 def run(self, cmd):
318 if self.nice_level:
319 nice_cmd = ['nice','-n', str(self.nice_level)]
320 nice_cmd.extend(cmd)
321 cmd = nice_cmd
322
323 out_file = None
324 if self.log_file:
325 try:
326 out_file = open(self.log_file, 'a')
327 out_file.write("\n%s\n" % ('*'*80))
328 out_file.write("%s> %s\n" % (time.strftime("%X %x"), cmd))
329 out_file.write("%s\n" % ('*'*80))
330 except:
331 pass
332
333 if not out_file:
334 out_file = open('/dev/null', 'w')
335
336 in_devnull = open('/dev/null', 'r')
337 print "cmd = %s" % cmd
338 print "path = %s" % os.getcwd()
339
340 proc = subprocess.Popen(cmd, stdout=out_file,
341 stderr=subprocess.STDOUT, stdin=in_devnull)
342 out_file.close()
343 in_devnull.close()
344 return proc
345
346
347 def kill(self):
348 self.proc.kill()
349
350
351 def exit_code(self):
352 return self.proc.poll()
353
354
355class Agent(object):
356 def __init__(self, tasks):
357 self.active_task = None
358 self.queue = Queue.Queue(0)
359 self.dispatcher = None
360
361 for task in tasks:
362 self.add_task(task)
363
364
365 def add_task(self, task):
366 self.queue.put_nowait(task)
367 task.agent = self
368
369
370 def tick(self):
371 print "agent tick"
372 if self.active_task and not self.active_task.is_done():
373 self.active_task.poll()
374 else:
375 self._next_task();
376
377
378 def _next_task(self):
379 print "agent picking task"
380 if self.active_task:
381 assert self.active_task.is_done()
382
mblighe2586682008-02-29 22:45:46 +0000383 if not self.active_task.success:
384 self.on_task_failure()
385
mbligh36768f02008-02-22 18:28:33 +0000386 self.active_task = None
387 if not self.is_done():
388 self.active_task = self.queue.get_nowait()
389 if self.active_task:
390 self.active_task.start()
391
392
mblighe2586682008-02-29 22:45:46 +0000393 def on_task_failure(self):
394 old_queue = self.queue
395 self.queue = Queue.Queue(0)
396 for task in self.active_task.failure_tasks:
397 self.add_task(task)
398 if not self.active_task.clear_queue_on_failure:
399 while not old_queue.empty():
400 self.add_task(old_queue.get_nowait())
401
mbligh36768f02008-02-22 18:28:33 +0000402 def is_done(self):
403 return self.active_task == None and self.queue.empty()
404
405
406 def start(self):
407 assert self.dispatcher
408
409 self._next_task()
410
411
412class AgentTask(object):
mblighe2586682008-02-29 22:45:46 +0000413 def __init__(self, cmd, failure_tasks = [],
414 clear_queue_on_failure=True):
415 """\
416 By default, on failure, the Agent's task queue is cleared and
417 replaced with the tasks in failure_tasks. If
418 clear_queue_on_failure=False, the task queue will not be
419 cleared, and the tasks in failure_tasks will be inserted at the
420 beginning of the queue.
421 """
mbligh36768f02008-02-22 18:28:33 +0000422 self.done = False
423 self.failure_tasks = failure_tasks
mblighe2586682008-02-29 22:45:46 +0000424 self.clear_queue_on_failure = clear_queue_on_failure
mbligh36768f02008-02-22 18:28:33 +0000425 self.started = False
426 self.cmd = cmd
427 self.agent = None
428
429
430 def poll(self):
431 print "poll"
432 if hasattr(self, 'monitor'):
433 self.tick(self.monitor.exit_code())
434 else:
435 self.finished(False)
436
437
438 def tick(self, exit_code):
439 if exit_code==None:
440 return
441# print "exit_code was %d" % exit_code
442 if exit_code == 0:
443 success = True
444 else:
445 success = False
446
447 self.finished(success)
448
449
450 def is_done(self):
451 return self.done
452
453
454 def finished(self, success):
455 self.done = True
456 self.success = success
457 self.epilog()
458
459
460 def prolog(self):
461 pass
462
463
464 def epilog(self):
465 pass
466
467
468 def start(self):
469 assert self.agent
470
471 if not self.started:
472 self.prolog()
473 self.run()
474
475 self.started = True
476
477
478 def abort(self):
479 self.monitor.kill()
480
481
482 def run(self):
483 if self.cmd:
484 print "agent starting monitor"
485
486 log_file = None
487 if hasattr(self, 'host'):
488 log_file = os.path.join(os.path.join(RESULTS_DIR, 'hosts'), self.host.hostname)
489
490 self.monitor = RunMonitor(self.cmd, nice_level = AUTOSERV_NICE_LEVEL, log_file = log_file)
491
492
493class RepairTask(AgentTask):
494 def __init__(self, host):
mbligh48c10a52008-02-29 22:46:38 +0000495 cmd = [_autoserv_path , '-R', '-m', host.hostname]
mbligh36768f02008-02-22 18:28:33 +0000496 self.host = host
mblighe2586682008-02-29 22:45:46 +0000497 AgentTask.__init__(self, cmd, clear_queue_on_failure=False)
498
mbligh36768f02008-02-22 18:28:33 +0000499
500 def prolog(self):
501 print "repair_task starting"
502 self.host.set_status('Repairing')
503
504
505 def epilog(self):
506 if self.success:
mblighe2586682008-02-29 22:45:46 +0000507 status = 'Repair Succeeded'
mbligh36768f02008-02-22 18:28:33 +0000508 else:
509 status = 'Repair Failed'
510
511 self.host.set_status(status)
512
513
514class VerifyTask(AgentTask):
515 def __init__(self, queue_entry=None, host=None):
516 assert bool(queue_entry) != bool(host)
517
518 self.host = host or queue_entry.host
519 self.queue_entry = queue_entry
520
521 self.temp_results_dir = tempfile.mkdtemp(suffix='.verify')
mbligh48c10a52008-02-29 22:46:38 +0000522 cmd = [_autoserv_path,'-v','-m',self.host.hostname,
mbligh36768f02008-02-22 18:28:33 +0000523 '-r', self.temp_results_dir]
524
mblighe2586682008-02-29 22:45:46 +0000525 failure_tasks = self.get_failure_tasks()
526
527 AgentTask.__init__(self, cmd, failure_tasks=failure_tasks,
528 clear_queue_on_failure=False)
529
530
531 def get_failure_tasks(self):
532 'To be overridden'
533 return [RepairTask(self.host),
534 ReverifyTask(self.queue_entry, self.host)]
mbligh36768f02008-02-22 18:28:33 +0000535
536
537 def prolog(self):
538 print "starting verify on %s" % (self.host.hostname)
539 if self.queue_entry:
540 self.queue_entry.set_status('Verifying')
541 self.host.set_status('Verifying')
542
543
544 def epilog(self):
545 if self.queue_entry and (self.success or
546 not self.queue_entry.meta_host):
547 self.move_results()
mblighe2586682008-02-29 22:45:46 +0000548 shutil.rmtree(self.temp_results_dir)
mbligh36768f02008-02-22 18:28:33 +0000549
550 if self.success:
mblighe2586682008-02-29 22:45:46 +0000551 self.on_success()
mbligh36768f02008-02-22 18:28:33 +0000552 else:
mblighe2586682008-02-29 22:45:46 +0000553 self.on_failure()
mbligh36768f02008-02-22 18:28:33 +0000554
mblighe2586682008-02-29 22:45:46 +0000555
556 def on_success(self):
557 self.host.set_status('Ready')
558
559
560 def on_failure(self):
561 self.host.set_status('Failed Verify')
562 # don't use queue_entry.requeue() here, because we don't want
563 # a meta-host entry to release its host yet - that should only
564 # happen after reverify fails
565 if self.queue_entry:
566 self.queue_entry.set_status('Queued')
mbligh36768f02008-02-22 18:28:33 +0000567
568
569 def move_results(self):
570 assert self.queue_entry is not None
mblighe2586682008-02-29 22:45:46 +0000571 target_dir = self.queue_entry.verify_results_dir()
mbligh36768f02008-02-22 18:28:33 +0000572 if not os.path.exists(target_dir):
573 os.makedirs(target_dir)
574 files = os.listdir(self.temp_results_dir)
575 for filename in files:
mblighe2586682008-02-29 22:45:46 +0000576 self.force_move(os.path.join(self.temp_results_dir,
577 filename),
578 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +0000579
580
mblighe2586682008-02-29 22:45:46 +0000581 @staticmethod
582 def force_move(source, dest):
583 """\
584 Replacement for shutil.move() that will delete the destination
585 if it exists, even if it's a directory.
586 """
587 if os.path.exists(dest):
588 print ('Warning: removing existing destination file ' +
589 dest)
590 remove_file_or_dir(dest)
591 shutil.move(source, dest)
592
593
594class ReverifyTask(VerifyTask):
595 def __init__(self, queue_entry=None, host=None):
596 if queue_entry:
597 VerifyTask.__init__(self, queue_entry=queue_entry)
598 else:
599 VerifyTask.__init__(self, host=host)
600 self.clear_queue_on_failure = True
601
602
603 def get_failure_tasks(self):
604 return []
605
606
607 def prolog(self):
608 VerifyTask.prolog(self)
609 if self.queue_entry:
610 self.queue_entry.clear_results_dir(
611 self.queue_entry.verify_results_dir())
612
613
614 def on_failure(self):
615 self.host.set_status('Repair Failed')
616 if self.queue_entry:
617 self.queue_entry.handle_host_failure()
618
619
620class VerifySynchronousMixin(object):
621 def on_pending(self):
622 if self.queue_entry.job.num_complete() > 0:
623 # some other entry failed verify, and we've
624 # already been marked as stopped
625 return
626
627 self.queue_entry.set_status('Pending')
628 job = self.queue_entry.job
629 if job.is_ready():
630 agent = job.run(self.queue_entry)
631 self.agent.dispatcher.add_agent(agent)
632
633
634class VerifySynchronousTask(VerifyTask, VerifySynchronousMixin):
mbligh36768f02008-02-22 18:28:33 +0000635 def __init__(self, queue_entry):
636 VerifyTask.__init__(self, queue_entry = queue_entry)
637
638
mblighe2586682008-02-29 22:45:46 +0000639 def get_failure_tasks(self):
640 return [RepairTask(self.host),
641 ReverifySynchronousTask(self.queue_entry)]
mbligh8ce2c4a2008-02-29 22:44:53 +0000642
643
mblighe2586682008-02-29 22:45:46 +0000644 def on_success(self):
645 VerifyTask.on_success(self)
646 self.on_pending()
647
648
649class ReverifySynchronousTask(ReverifyTask, VerifySynchronousMixin):
650 def __init__(self, queue_entry):
651 ReverifyTask.__init__(self, queue_entry = queue_entry)
652
653
654 def on_success(self):
655 ReverifyTask.on_success(self)
656 self.on_pending()
mbligh36768f02008-02-22 18:28:33 +0000657
658
659class QueueTask(AgentTask):
660 def __init__(self, job, queue_entries, cmd):
661 AgentTask.__init__(self, cmd)
662 self.job = job
663 self.queue_entries = queue_entries
664
665
mbligh4314a712008-02-29 22:44:30 +0000666 @staticmethod
667 def _write_keyval(queue_entry, field, value):
mbligh36768f02008-02-22 18:28:33 +0000668 key_path = os.path.join(queue_entry.results_dir(), 'keyval')
669 keyval_file = open(key_path, 'a')
670 print >> keyval_file, '%s=%d' % (field, value)
671 keyval_file.close()
672
673
674 def prolog(self):
mblighe2586682008-02-29 22:45:46 +0000675 # write some job timestamps into the job keyval file
676 queued = time.mktime(self.job.created_on.timetuple())
677 started = time.time()
678 self._write_keyval(self.queue_entries[0], "job_queued", queued)
679 self._write_keyval(self.queue_entries[0], "job_started",
680 started)
mbligh36768f02008-02-22 18:28:33 +0000681 for queue_entry in self.queue_entries:
682 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
683 queue_entry.set_status('Running')
684 queue_entry.host.set_status('Running')
mblighe2586682008-02-29 22:45:46 +0000685 if (not self.job.is_synchronous() and
686 self.job.num_machines() > 1):
687 assert len(self.queue_entries) == 1
688 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +0000689
690
691 def epilog(self):
692 if self.success:
693 status = 'Completed'
694 else:
695 status = 'Failed'
696
mblighe2586682008-02-29 22:45:46 +0000697 # write another timestamp into the job keyval file
698 finished = time.time()
699 self._write_keyval(self.queue_entries[0], "job_finished",
700 finished)
mbligh36768f02008-02-22 18:28:33 +0000701 for queue_entry in self.queue_entries:
702 queue_entry.set_status(status)
703 queue_entry.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +0000704
705 if self.job.is_synchronous() or self.job.num_machines()==1:
706 if self.job.is_finished():
707 parse_results(self.job.results_dir())
708 else:
709 for queue_entry in self.queue_entries:
710 parse_results(queue_entry.results_dir(), flags='-l 2')
711
712 print "queue_task finished with %s/%s" % (status, self.success)
713
714
715class RebootTask(AgentTask):
716 def __init__(self):
717 AgentTask.__init__(self, host)
mbligh48c10a52008-02-29 22:46:38 +0000718 self.cmd = "%s -b -m %s /dev/null" % (_autoserv_path, host)
mbligh36768f02008-02-22 18:28:33 +0000719 self.host = host
720
721
722 def tick(self, exit_code):
723 raise "not implemented"
724
725
726 def run(self):
727 raise "not implemented"
728
729
730
731class DBObject(object):
mblighe2586682008-02-29 22:45:46 +0000732 def __init__(self, fields, id=None, row=None, new_record=False):
733 assert (bool(id) != bool(row)) and fields
mbligh36768f02008-02-22 18:28:33 +0000734
mblighe2586682008-02-29 22:45:46 +0000735 self.__table = self._get_table()
mbligh36768f02008-02-22 18:28:33 +0000736 self.__fields = fields
737
738 self.__new_record = new_record
739
740 if row is None:
741 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
mbligh6f8bab42008-02-29 22:45:14 +0000742 rows = _db.execute(sql, (id,))
743 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +0000744 raise "row not found (table=%s, id=%s)" % \
745 (self.__table, id)
mbligh6f8bab42008-02-29 22:45:14 +0000746 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +0000747
mblighe2586682008-02-29 22:45:46 +0000748 assert len(row)==len(fields), (
749 "table = %s, row = %s/%d, fields = %s/%d" % (
750 self.__table, row, len(row), fields, len(fields)))
mbligh36768f02008-02-22 18:28:33 +0000751
752 self.__valid_fields = {}
753 for i,value in enumerate(row):
754 self.__dict__[fields[i]] = value
755 self.__valid_fields[fields[i]] = True
756
757 del self.__valid_fields['id']
758
mblighe2586682008-02-29 22:45:46 +0000759
760 @classmethod
761 def _get_table(cls):
762 raise NotImplementedError('Subclasses must override this')
763
764
mbligh36768f02008-02-22 18:28:33 +0000765 def count(self, where, table = None):
766 if not table:
767 table = self.__table
mbligh4314a712008-02-29 22:44:30 +0000768
mbligh6f8bab42008-02-29 22:45:14 +0000769 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000770 SELECT count(*) FROM %s
771 WHERE %s
772 """ % (table, where))
mbligh36768f02008-02-22 18:28:33 +0000773
mbligh6f8bab42008-02-29 22:45:14 +0000774 assert len(rows) == 1
775
776 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +0000777
778
779 def num_cols(self):
780 return len(self.__fields)
781
782
783 def update_field(self, field, value):
784 assert self.__valid_fields[field]
785
786 if self.__dict__[field] == value:
787 return
788
789 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
790 (self.__table, field)
mbligh6f8bab42008-02-29 22:45:14 +0000791 _db.execute(query, (value, self.id))
mbligh36768f02008-02-22 18:28:33 +0000792
793 self.__dict__[field] = value
794
795
796 def save(self):
797 if self.__new_record:
798 keys = self.__fields[1:] # avoid id
799 columns = ','.join([str(key) for key in keys])
800 values = ['"%s"' % self.__dict__[key] for key in keys]
801 values = ','.join(values)
802 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
803 (self.__table, columns, values)
mbligh6f8bab42008-02-29 22:45:14 +0000804 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +0000805
806
mblighe2586682008-02-29 22:45:46 +0000807 def delete(self):
808 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
809 _db.execute(query, (self.id,))
810
811
812 @classmethod
813 def fetch(cls, where):
814 rows = _db.execute(
815 'SELECT * FROM %s WHERE %s' % (cls._get_table(), where))
816 for row in rows:
817 yield cls(row=row)
818
mbligh36768f02008-02-22 18:28:33 +0000819
820class IneligibleHostQueue(DBObject):
821 def __init__(self, id=None, row=None, new_record=None):
822 fields = ['id', 'job_id', 'host_id']
mblighe2586682008-02-29 22:45:46 +0000823 DBObject.__init__(self, fields, id=id, row=row,
824 new_record=new_record)
825
826
827 @classmethod
828 def _get_table(cls):
829 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +0000830
831
832class Host(DBObject):
833 def __init__(self, id=None, row=None):
834 fields = ['id', 'hostname', 'locked', 'synch_id','status']
mblighe2586682008-02-29 22:45:46 +0000835 DBObject.__init__(self, fields, id=id, row=row)
836
837
838 @classmethod
839 def _get_table(cls):
840 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +0000841
842
843 def current_task(self):
mbligh6f8bab42008-02-29 22:45:14 +0000844 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000845 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
846 """, (self.id,))
847
mbligh6f8bab42008-02-29 22:45:14 +0000848 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +0000849 return None
850 else:
mbligh6f8bab42008-02-29 22:45:14 +0000851 assert len(rows) == 1
852 results = rows[0];
mbligh36768f02008-02-22 18:28:33 +0000853# print "current = %s" % results
854 return HostQueueEntry(row=results)
855
856
857 def next_queue_entries(self):
858 if self.locked:
859 print "%s locked, not queuing" % self.hostname
860 return None
861# print "%s/%s looking for work" % (self.hostname, self.platform_id)
mbligh6f8bab42008-02-29 22:45:14 +0000862 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000863 SELECT * FROM host_queue_entries
864 WHERE ((host_id=%s) OR (meta_host IS NOT null AND
865 (meta_host IN (
866 SELECT label_id FROM hosts_labels WHERE host_id=%s
867 )
868 )
869 AND job_id NOT IN (
870 SELECT job_id FROM ineligible_host_queues
871 WHERE host_id=%s
872 )))
873 AND NOT complete AND NOT active
874 ORDER BY priority DESC, meta_host, id
875 LIMIT 1
876 """, (self.id,self.id, self.id))
877
mbligh6f8bab42008-02-29 22:45:14 +0000878 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +0000879 return None
880 else:
mbligh6f8bab42008-02-29 22:45:14 +0000881 return [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000882
883 def yield_work(self):
884 print "%s yielding work" % self.hostname
885 if self.current_task():
886 self.current_task().requeue()
887
888 def set_status(self,status):
889 self.update_field('status',status)
890
891
892class HostQueueEntry(DBObject):
893 def __init__(self, id=None, row=None):
894 assert id or row
895 fields = ['id', 'job_id', 'host_id', 'priority', 'status',
896 'meta_host', 'active', 'complete']
mblighe2586682008-02-29 22:45:46 +0000897 DBObject.__init__(self, fields, id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +0000898
899 self.job = Job(self.job_id)
900
901 if self.host_id:
902 self.host = Host(self.host_id)
903 else:
904 self.host = None
905
906 self.queue_log_path = os.path.join(self.job.results_dir(),
907 'queue.log.' + str(self.id))
908
909
mblighe2586682008-02-29 22:45:46 +0000910 @classmethod
911 def _get_table(cls):
912 return 'host_queue_entries'
913
914
mbligh36768f02008-02-22 18:28:33 +0000915 def set_host(self, host):
916 if host:
917 self.queue_log_record('Assigning host ' + host.hostname)
918 self.update_field('host_id', host.id)
919 self.update_field('active', True)
mblighe2586682008-02-29 22:45:46 +0000920 self.block_host(host.id)
mbligh36768f02008-02-22 18:28:33 +0000921 else:
922 self.queue_log_record('Releasing host')
mblighe2586682008-02-29 22:45:46 +0000923 self.unblock_host(self.host.id)
mbligh36768f02008-02-22 18:28:33 +0000924 self.update_field('host_id', None)
925
926 self.host = host
927
928
929 def get_host(self):
mblighe2586682008-02-29 22:45:46 +0000930 return self.host
mbligh36768f02008-02-22 18:28:33 +0000931
932
933 def queue_log_record(self, log_line):
mblighe2586682008-02-29 22:45:46 +0000934 now = str(datetime.datetime.now())
mbligh36768f02008-02-22 18:28:33 +0000935 queue_log = open(self.queue_log_path, 'a', 0)
mblighe2586682008-02-29 22:45:46 +0000936 queue_log.write(now + ' ' + log_line + '\n')
mbligh36768f02008-02-22 18:28:33 +0000937 queue_log.close()
938
939
mblighe2586682008-02-29 22:45:46 +0000940 def block_host(self, host_id):
941 print "creating block %s/%s" % (self.job.id, host_id)
942 row = [0, self.job.id, host_id]
943 block = IneligibleHostQueue(row=row, new_record=True)
944 block.save()
945
946
947 def unblock_host(self, host_id):
948 print "removing block %s/%s" % (self.job.id, host_id)
949 blocks = list(IneligibleHostQueue.fetch(
950 'job_id=%d and host_id=%d' % (self.job.id, host_id)))
951 assert len(blocks) == 1
952 blocks[0].delete()
953
954
mbligh36768f02008-02-22 18:28:33 +0000955 def results_dir(self):
mblighe2586682008-02-29 22:45:46 +0000956 if self.job.is_synchronous() or self.job.num_machines() == 1:
957 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +0000958 else:
959 assert self.host
mblighe2586682008-02-29 22:45:46 +0000960 return os.path.join(self.job.job_dir,
961 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +0000962
mblighe2586682008-02-29 22:45:46 +0000963
964 def verify_results_dir(self):
965 if self.job.is_synchronous() or self.job.num_machines() > 1:
966 assert self.host
967 return os.path.join(self.job.job_dir,
968 self.host.hostname)
969 else:
970 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +0000971
972
973 def set_status(self, status):
974 self.update_field('status', status)
975 if self.host:
976 hostname = self.host.hostname
977 else:
978 hostname = 'no host'
979 print "%s/%d status -> %s" % (hostname, self.id, self.status)
980 if status in ['Queued']:
981 self.update_field('complete', False)
982 self.update_field('active', False)
983
984 if status in ['Pending', 'Running', 'Verifying', 'Starting']:
985 self.update_field('complete', False)
986 self.update_field('active', True)
987
988 if status in ['Failed', 'Completed', 'Stopped']:
989 self.update_field('complete', True)
990 self.update_field('active', False)
991
992
993 def run(self,assigned_host=None):
994 if self.meta_host:
995 assert assigned_host
mblighe2586682008-02-29 22:45:46 +0000996 # ensure results dir exists for the queue log
mbligh36768f02008-02-22 18:28:33 +0000997 self.job.create_results_dir()
998 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +0000999
mbligh36768f02008-02-22 18:28:33 +00001000 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1001 self.meta_host, self.host.hostname, self.status)
1002
1003 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001004
mbligh36768f02008-02-22 18:28:33 +00001005 def requeue(self):
1006 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001007
mbligh36768f02008-02-22 18:28:33 +00001008 if self.meta_host:
1009 self.set_host(None)
1010
1011
mblighe2586682008-02-29 22:45:46 +00001012 def handle_host_failure(self):
1013 """\
1014 Called when this queue entry's host has failed verification and
1015 repair.
1016 """
1017 if self.meta_host:
1018 self.requeue()
1019 else:
1020 self.set_status('Failed')
1021 if self.job.is_synchronous():
1022 self.job.stop_all_entries()
1023
1024
1025 def clear_results_dir(self, results_dir=None):
1026 results_dir = results_dir or self.results_dir()
1027 if not os.path.exists(results_dir):
1028 return
1029 for filename in os.listdir(results_dir):
1030 if 'queue.log' in filename:
1031 continue
1032 path = os.path.join(results_dir, filename)
1033 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00001034
1035
1036class Job(DBObject):
1037 def __init__(self, id=None, row=None):
1038 assert id or row
mblighe2586682008-02-29 22:45:46 +00001039 DBObject.__init__(self,
1040 ['id','owner','name','priority',
1041 'control_file','control_type','created_on',
1042 'synch_type', 'synch_count','synchronizing'],
1043 id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001044
mblighe2586682008-02-29 22:45:46 +00001045 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
1046 self.owner))
1047
1048
1049 @classmethod
1050 def _get_table(cls):
1051 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001052
1053
1054 def is_server_job(self):
1055 return self.control_type != 2
1056
1057
1058 def get_host_queue_entries(self):
mbligh6f8bab42008-02-29 22:45:14 +00001059 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001060 SELECT * FROM host_queue_entries
1061 WHERE job_id= %s
1062 """, (self.id,))
mbligh6f8bab42008-02-29 22:45:14 +00001063 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001064
1065 assert len(entries)>0
1066
1067 return entries
1068
1069
1070 def set_status(self, status, update_queues=False):
1071 self.update_field('status',status)
1072
1073 if update_queues:
1074 for queue_entry in self.get_host_queue_entries():
1075 queue_entry.set_status(status)
1076
1077
1078 def is_synchronous(self):
1079 return self.synch_type == 2
1080
1081
1082 def is_ready(self):
1083 if not self.is_synchronous():
1084 return True
1085 sql = "job_id=%s AND status='Pending'" % self.id
1086 count = self.count(sql, table='host_queue_entries')
1087 return (count == self.synch_count)
1088
1089
1090 def ready_to_synchronize(self):
1091 # heuristic
1092 queue_entries = self.get_host_queue_entries()
1093 count = 0
1094 for queue_entry in queue_entries:
1095 if queue_entry.status == 'Pending':
1096 count += 1
1097
1098 return (count/self.synch_count >= 0.5)
1099
1100
1101 def start_synchronizing(self):
1102 self.update_field('synchronizing', True)
1103
1104
1105 def results_dir(self):
1106 return self.job_dir
1107
1108 def num_machines(self, clause = None):
1109 sql = "job_id=%s" % self.id
1110 if clause:
1111 sql += " AND (%s)" % clause
1112 return self.count(sql, table='host_queue_entries')
1113
1114
1115 def num_queued(self):
1116 return self.num_machines('not complete')
1117
1118
1119 def num_active(self):
1120 return self.num_machines('active')
1121
1122
1123 def num_complete(self):
1124 return self.num_machines('complete')
1125
1126
1127 def is_finished(self):
1128 left = self.num_queued()
1129 print "%s: %s machines left" % (self.name, left)
1130 return left==0
1131
1132 def stop_synchronizing(self):
1133 self.update_field('synchronizing', False)
1134 self.set_status('Queued', update_queues = False)
1135
1136
mblighe2586682008-02-29 22:45:46 +00001137 def stop_all_entries(self):
1138 for child_entry in self.get_host_queue_entries():
1139 if not child_entry.complete:
1140 child_entry.set_status('Stopped')
1141
1142
1143 def write_to_machines_file(self, queue_entry):
1144 hostname = queue_entry.get_host().hostname
1145 print "writing %s to job %s machines file" % (hostname, self.id)
1146 file_path = os.path.join(self.job_dir, '.machines')
1147 mf = open(file_path, 'a')
1148 mf.write("%s\n" % queue_entry.get_host().hostname)
1149 mf.close()
mbligh36768f02008-02-22 18:28:33 +00001150
1151
1152 def create_results_dir(self, queue_entry=None):
1153 print "create: active: %s complete %s" % (self.num_active(),
1154 self.num_complete())
1155
1156 if not os.path.exists(self.job_dir):
1157 os.makedirs(self.job_dir)
1158
1159 if queue_entry:
1160 return queue_entry.results_dir()
1161 return self.job_dir
1162
1163
1164 def run(self, queue_entry):
1165 results_dir = self.create_results_dir(queue_entry)
1166
1167 if self.is_synchronous():
1168 if not self.is_ready():
1169 return Agent([VerifySynchronousTask(queue_entry = queue_entry)])
1170
1171 queue_entry.set_status('Starting')
1172
1173 ctrl = open(os.tmpnam(), 'w')
1174 if self.control_file:
1175 ctrl.write(self.control_file)
1176 else:
1177 ctrl.write("")
1178 ctrl.flush()
1179
1180 if self.is_synchronous():
mbligh36768f02008-02-22 18:28:33 +00001181 queue_entries = self.get_host_queue_entries()
1182 else:
1183 assert queue_entry
mbligh36768f02008-02-22 18:28:33 +00001184 queue_entries = [queue_entry]
mblighe2586682008-02-29 22:45:46 +00001185 hostnames = ','.join([entry.get_host().hostname
1186 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00001187
mbligh4314a712008-02-29 22:44:30 +00001188 params = [_autoserv_path, '-n', '-r', results_dir,
mbligh36768f02008-02-22 18:28:33 +00001189 '-b', '-u', self.owner, '-l', self.name,
1190 '-m', hostnames, ctrl.name]
1191
1192 if not self.is_server_job():
1193 params.append('-c')
1194
1195 tasks = []
1196 if not self.is_synchronous():
1197 tasks.append(VerifyTask(queue_entry))
mblighe2586682008-02-29 22:45:46 +00001198
1199 tasks.append(QueueTask(job = self,
1200 queue_entries = queue_entries,
1201 cmd = params))
mbligh36768f02008-02-22 18:28:33 +00001202
1203 agent = Agent(tasks)
1204
1205 return agent
1206
1207
1208if __name__ == '__main__':
1209 main()