blob: a05b6e5e8675f25fc2a5cbfb71647c28f350c8c2 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
mblighbb421852008-03-11 22:36:16 +00009import optparse, signal, smtplib, socket, datetime, stat, pwd
mblighb090f142008-02-27 21:33:46 +000010from common import global_config
11
mbligh36768f02008-02-22 18:28:33 +000012RESULTS_DIR = '.'
13AUTOSERV_NICE_LEVEL = 10
14
15AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
16
17if os.environ.has_key('AUTOTEST_DIR'):
18 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
19AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
20AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
21
22if AUTOTEST_SERVER_DIR not in sys.path:
23 sys.path.insert(0, AUTOTEST_SERVER_DIR)
24
mblighbb421852008-03-11 22:36:16 +000025AUTOSERV_PID_FILE = '.autoserv_execute'
26
mbligh6f8bab42008-02-29 22:45:14 +000027_db = None
mbligh36768f02008-02-22 18:28:33 +000028_shutdown = False
29_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000030_autoserv_path = 'autoserv'
31_testing_mode = False
mbligh36768f02008-02-22 18:28:33 +000032
33
34def main():
35 usage = 'usage: %prog [options] results_dir'
36
37 parser = optparse.OptionParser(usage)
mblighbb421852008-03-11 22:36:16 +000038 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
mbligh36768f02008-02-22 18:28:33 +000039 action='store_true')
40 parser.add_option('--logfile', help='Set a log file that all stdout ' +
41 'should be redirected to. Stderr will go to this ' +
42 'file + ".err"')
43 parser.add_option('--notify', help='Set an email address to be ' +
44 'notified of exceptions')
mbligh4314a712008-02-29 22:44:30 +000045 parser.add_option('--test', help='Indicate that scheduler is under ' +
46 'test and should use dummy autoserv and no parsing',
47 action='store_true')
mbligh36768f02008-02-22 18:28:33 +000048 (options, args) = parser.parse_args()
49 if len(args) != 1:
50 parser.print_usage()
51 return
52
53 global RESULTS_DIR
54 RESULTS_DIR = args[0]
55
56 global _notify_email
57 _notify_email = options.notify
mbligh4314a712008-02-29 22:44:30 +000058
59 if options.test:
60 global _autoserv_path
61 _autoserv_path = 'autoserv_dummy'
62 global _testing_mode
63 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000064
65 init(options.logfile)
mblighbb421852008-03-11 22:36:16 +000066 dispatcher = Dispatcher()
67 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
mbligh36768f02008-02-22 18:28:33 +000068
69 try:
70 while not _shutdown:
71 dispatcher.tick()
72 time.sleep(20)
mbligh36768f02008-02-22 18:28:33 +000073 except:
74 log_stacktrace("Uncaught exception; terminating monitor_db")
75
mbligh6f8bab42008-02-29 22:45:14 +000076 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +000077
78
79def handle_sigint(signum, frame):
80 global _shutdown
81 _shutdown = True
82 print "Shutdown request received."
83
84
85def init(logfile):
86 if logfile:
87 enable_logging(logfile)
88 print "%s> dispatcher starting" % time.strftime("%X %x")
89 print "My PID is %d" % os.getpid()
90
91 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
mbligh6f8bab42008-02-29 22:45:14 +000092 global _db
93 _db = DatabaseConn()
mbligh36768f02008-02-22 18:28:33 +000094
95 print "Setting signal handler"
96 signal.signal(signal.SIGINT, handle_sigint)
97
98 print "Connected! Running..."
99
100
101def enable_logging(logfile):
102 out_file = logfile
103 err_file = "%s.err" % logfile
104 print "Enabling logging to %s (%s)" % (out_file, err_file)
105 out_fd = open(out_file, "a", buffering=0)
106 err_fd = open(err_file, "a", buffering=0)
107
108 os.dup2(out_fd.fileno(), sys.stdout.fileno())
109 os.dup2(err_fd.fileno(), sys.stderr.fileno())
110
111 sys.stdout = out_fd
112 sys.stderr = err_fd
113
114
115def idle_hosts():
mbligh6f8bab42008-02-29 22:45:14 +0000116 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000117 SELECT * FROM hosts h WHERE
118 id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
119 (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
120 OR
121 (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
122 INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
123 )
124 AND locked=false AND (h.status IS null OR h.status='Ready') """)
mbligh6f8bab42008-02-29 22:45:14 +0000125 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000126 return hosts
127
mblighd5c95802008-03-05 00:33:46 +0000128def queue_entries_to_abort():
129 rows = _db.execute("""
130 SELECT * FROM host_queue_entries WHERE status='Abort';
131 """)
132 qe = [HostQueueEntry(row=i) for i in rows]
133 return qe
mbligh36768f02008-02-22 18:28:33 +0000134
mblighe2586682008-02-29 22:45:46 +0000135def remove_file_or_dir(path):
136 if stat.S_ISDIR(os.stat(path).st_mode):
137 # directory
138 shutil.rmtree(path)
139 else:
140 # file
141 os.remove(path)
142
143
mbligh6f8bab42008-02-29 22:45:14 +0000144class DatabaseConn:
145 def __init__(self):
146 self.reconnect_wait = 20
147 self.conn = None
148 self.cur = None
mbligh36768f02008-02-22 18:28:33 +0000149
mbligh6f8bab42008-02-29 22:45:14 +0000150 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000151
152
mbligh6f8bab42008-02-29 22:45:14 +0000153 def connect(self):
154 self.disconnect()
155
156 # get global config and parse for info
157 c = global_config.global_config
158 dbase = "AUTOTEST_WEB"
mbligh104e9ce2008-03-11 22:01:44 +0000159 DB_HOST = c.get_config_value(dbase, "host")
160 DB_SCHEMA = c.get_config_value(dbase, "database")
mbligh6f8bab42008-02-29 22:45:14 +0000161
162 global _testing_mode
163 if _testing_mode:
164 DB_SCHEMA = 'stresstest_autotest_web'
165
mbligh104e9ce2008-03-11 22:01:44 +0000166 DB_USER = c.get_config_value(dbase, "user")
167 DB_PASS = c.get_config_value(dbase, "password")
mbligh6f8bab42008-02-29 22:45:14 +0000168
169 while not self.conn:
170 try:
171 self.conn = MySQLdb.connect(host=DB_HOST,
172 user=DB_USER,
173 passwd=DB_PASS,
174 db=DB_SCHEMA)
175
176 self.conn.autocommit(True)
177 self.cur = self.conn.cursor()
178 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000179 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000180 print "Can't connect to MYSQL; reconnecting"
181 time.sleep(self.reconnect_wait)
182 self.disconnect()
183
184
185 def disconnect(self):
186 if self.conn:
187 self.conn.close()
188 self.conn = None
189 self.cur = None
190
191
192 def execute(self, *args, **dargs):
193 while (True):
194 try:
195 self.cur.execute(*args, **dargs)
196 return self.cur.fetchall()
197 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000198 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000199 print "MYSQL connection died; reconnecting"
200 time.sleep(self.reconnect_wait)
201 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000202
203
mblighdbdac6c2008-03-05 15:49:58 +0000204def generate_parse_command(results_dir, flags=""):
205 parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
206 output = os.path.abspath(os.path.join(results_dir, '.parse.log'))
207 cmd = "%s %s -r -o %s > %s 2>&1 &"
208 return cmd % (parse, flags, results_dir, output)
209
210
mbligh36768f02008-02-22 18:28:33 +0000211def parse_results(results_dir, flags=""):
mbligh4314a712008-02-29 22:44:30 +0000212 if _testing_mode:
213 return
mblighdbdac6c2008-03-05 15:49:58 +0000214 os.system(generate_parse_command(results_dir, flags))
mbligh36768f02008-02-22 18:28:33 +0000215
216
mblighbb421852008-03-11 22:36:16 +0000217def send_notify_email(subject, message):
218 if not _notify_email:
219 return
220
221 message = "%s / %s / %s\n%s" % (socket.gethostname(), os.getpid(),
222 time.strftime("%X %x"), message)
223 sender = pwd.getpwuid(os.getuid())[0] # see os.getlogin() online docs
224 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
225 sender, _notify_email, subject, message)
226 mailer = smtplib.SMTP('localhost')
227 mailer.sendmail(sender, _notify_email, msg)
228 mailer.quit()
229
230
mbligh36768f02008-02-22 18:28:33 +0000231def log_stacktrace(reason):
232 (type, value, tb) = sys.exc_info()
233 str = "EXCEPTION: %s\n" % reason
mbligh36768f02008-02-22 18:28:33 +0000234 str += ''.join(traceback.format_exception(type, value, tb))
235
236 sys.stderr.write("\n%s\n" % str)
mblighbb421852008-03-11 22:36:16 +0000237 send_notify_email("monitor_db exception", str)
mbligh36768f02008-02-22 18:28:33 +0000238
mblighbb421852008-03-11 22:36:16 +0000239
240def get_proc_poll_fn(pid):
241 proc_path = os.path.join('/proc', str(pid))
242 def poll_fn():
243 if os.path.exists(proc_path):
244 return None
245 return 0 # we can't get a real exit code
246 return poll_fn
247
248
249def kill_autoserv(pid, poll_fn=None):
250 print 'killing', pid
251 if poll_fn is None:
252 poll_fn = get_proc_poll_fn(pid)
253 if poll_fn() == None:
254 os.kill(pid, signal.SIGCONT)
255 os.kill(pid, signal.SIGTERM)
mbligh36768f02008-02-22 18:28:33 +0000256
257
258class Dispatcher:
mblighbb421852008-03-11 22:36:16 +0000259 def __init__(self):
mbligh36768f02008-02-22 18:28:33 +0000260 self._agents = []
mbligh36768f02008-02-22 18:28:33 +0000261
mbligh36768f02008-02-22 18:28:33 +0000262
mblighbb421852008-03-11 22:36:16 +0000263 def do_initial_recovery(self, recover_hosts=True):
264 # always recover processes
265 self._recover_processes()
266
267 if recover_hosts:
268 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000269
270
271 def tick(self):
mblighbb421852008-03-11 22:36:16 +0000272 self._find_aborting()
273 self._find_more_work()
mbligh36768f02008-02-22 18:28:33 +0000274 self._handle_agents()
275
276
277 def add_agent(self, agent):
278 self._agents.append(agent)
279 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000280
281 # Find agent corresponding to the specified queue_entry
282 def get_agents(self, queue_entry):
283 res_agents = []
284 for agent in self._agents:
285 if queue_entry.id in agent.queue_entry_ids:
286 res_agents.append(agent)
287 return res_agents
288
289
290 def remove_agent(self, agent):
291 self._agents.remove(agent)
mbligh36768f02008-02-22 18:28:33 +0000292
293
mblighbb421852008-03-11 22:36:16 +0000294 def find_orphaned_autoservs(self):
295 """\
296 Returns a dict mapping pids to command lines for root autoserv
297 processes that have been orphaned.
298 """
299 proc = subprocess.Popen(
300 ['/bin/ps', 'x', '-o', 'pid,ppid,comm,args'],
301 stdout=subprocess.PIPE)
302 # split each line into the four columns output by ps
303 procs = [line.split(None, 3) for line in
304 proc.communicate()[0].splitlines()]
305 autoserv_procs = [(int(proc[0]), proc[3]) # pid, args
306 for proc in procs
307 if proc[2] == 'autoserv' # comm
308 and proc[1] == '1'] # ppid
309 return dict(autoserv_procs)
310
311
312 def recover_queue_entry(self, queue_entry, run_monitor):
313 job = queue_entry.job
314 if job.is_synchronous():
315 all_queue_entries = job.get_host_queue_entries()
316 else:
317 all_queue_entries = [queue_entry]
318 all_queue_entry_ids = [queue_entry.id for queue_entry
319 in all_queue_entries]
320 queue_task = RecoveryQueueTask(
321 job=queue_entry.job,
322 queue_entries=all_queue_entries,
323 run_monitor=run_monitor)
324 self.add_agent(Agent(tasks=[queue_task],
325 queue_entry_ids=all_queue_entry_ids))
326
327
328 def _recover_processes(self):
329 orphans = self.find_orphaned_autoservs()
330
331 # first, recover running queue entries
332 rows = _db.execute("""SELECT * FROM host_queue_entries
333 WHERE status = 'Running'""")
334 queue_entries = [HostQueueEntry(row=i) for i in rows]
335 requeue_entries = []
336 recovered_entry_ids = set()
337 for queue_entry in queue_entries:
338 run_monitor = PidfileRunMonitor(
339 queue_entry.results_dir())
340 pid, exit_code = run_monitor.get_pidfile_info()
341 if pid is None:
342 # autoserv apparently never got run, so requeue
343 requeue_entries.append(queue_entry)
344 continue
345 if queue_entry.id in recovered_entry_ids:
346 # synchronous job we've already recovered
347 continue
348 print 'Recovering queue entry %d (pid %d)' % (
349 queue_entry.id, pid)
350 job = queue_entry.job
351 if job.is_synchronous():
352 for entry in job.get_host_queue_entries():
353 assert entry.active
354 recovered_entry_ids.add(entry.id)
355 self.recover_queue_entry(queue_entry,
356 run_monitor)
357 orphans.pop(pid, None)
358
359 # and requeue other active queue entries
360 rows = _db.execute("""SELECT * FROM host_queue_entries
361 WHERE active AND NOT complete
362 AND status != 'Running'
363 AND status != 'Pending'
364 AND status != 'Abort'
365 AND status != 'Aborting'""")
366 queue_entries = [HostQueueEntry(row=i) for i in rows]
367 for queue_entry in queue_entries + requeue_entries:
368 print 'Requeuing running QE %d' % queue_entry.id
369 queue_entry.clear_results_dir()
370 queue_entry.requeue()
371
372
373 # now kill any remaining autoserv processes
374 for pid in orphans.keys():
375 print 'Killing orphan %d (%s)' % (pid, orphans[pid])
376 kill_autoserv(pid)
377
378 # recover aborting tasks
mblighd5c95802008-03-05 00:33:46 +0000379 rows = _db.execute("""SELECT * FROM host_queue_entries
380 WHERE status='Abort' or status='Aborting'""")
mblighbb421852008-03-11 22:36:16 +0000381 queue_entries = [HostQueueEntry(row=i) for i in rows]
382 for queue_entry in queue_entries:
383 queue_host = queue_entry.get_host()
384 reboot_task = RebootTask(queue_host)
385 verify_task = VerifyTask(host = queue_host)
386 self.add_agent(Agent(tasks=[reboot_task,
387 verify_task],
388 queue_entry_ids=[queue_entry.id]))
389 queue_entry.set_status('Aborted')
390 # Secure the host from being picked up
391 queue_host.set_status('Rebooting')
mblighd5c95802008-03-05 00:33:46 +0000392
mblighbb421852008-03-11 22:36:16 +0000393 # reverify hosts that were in the middle of verify, repair or
394 # reboot
mbligh6f8bab42008-02-29 22:45:14 +0000395 rows = _db.execute("""SELECT * FROM hosts
mblighbb421852008-03-11 22:36:16 +0000396 WHERE locked = 0 AND
397 (status = 'Repairing'
398 OR status = 'Verifying'
399 OR status = 'Rebooting')""")
400 hosts = [Host(row=i) for i in rows]
401 for host in hosts:
402 self.add_agent(Agent(tasks=[VerifyTask(host=host)]))
403
404
405 def _recover_hosts(self):
406 # look for both "Repair Failed" hosts, which we expect, and
407 # "Running" hosts with no active queue entries, which should
408 # never happen
409 rows = _db.execute(
410 """SELECT * FROM hosts WHERE locked = 0 AND
411 (status = 'Repair Failed'
412 OR (status = 'Running' AND
413 id NOT IN (SELECT host_id FROM host_queue_entries
414 WHERE active)))""")
415 hosts = [Host(row=i) for i in rows]
416 for host in hosts:
417 if host.status == 'Running':
418 print ('Recovering running host %s - this '
419 'probably indicates a scheduler bug' %
420 host.hostname)
421 verify_task = VerifyTask(host = host)
422 self.add_agent(Agent(tasks = [verify_task]))
mbligh36768f02008-02-22 18:28:33 +0000423
424
425 def _find_more_work(self):
426 print "finding work"
427
428 num_started = 0
429 for host in idle_hosts():
430 tasks = host.next_queue_entries()
431 if tasks:
432 for next in tasks:
433 try:
434 agent = next.run(assigned_host=host)
435 if agent:
436 self.add_agent(agent)
437
438 num_started += 1
439 if num_started>=100:
440 return
441 break
442 except:
443 next.set_status('Failed')
444
445# if next.host:
446# next.host.set_status('Ready')
447
448 log_stacktrace("task_id = %d" % next.id)
449
450
mblighd5c95802008-03-05 00:33:46 +0000451 def _find_aborting(self):
452 num_aborted = 0
453 # Find jobs that are aborting
454 for entry in queue_entries_to_abort():
455 agents_to_abort = self.get_agents(entry)
456 entry_host = entry.get_host()
457 reboot_task = RebootTask(entry_host)
458 verify_task = VerifyTask(host = entry_host)
459 tasks = [reboot_task, verify_task]
460 if agents_to_abort:
461 abort_task = AbortTask(entry, agents_to_abort)
462 tasks.insert(0, abort_task)
463 else:
464 entry.set_status('Aborted')
465 # just to make sure this host does not get
466 # taken away
467 entry_host.set_status('Rebooting')
468 self.add_agent(Agent(tasks=tasks,
469 queue_entry_ids = [entry.id]))
470 num_aborted += 1
471 if num_aborted >= 50:
472 break
473
474
mbligh36768f02008-02-22 18:28:33 +0000475 def _handle_agents(self):
476 still_running = []
477 for agent in self._agents:
478 agent.tick()
479 if not agent.is_done():
480 still_running.append(agent)
481 else:
482 print "agent finished"
483 self._agents = still_running
484
485
486class RunMonitor(object):
487 def __init__(self, cmd, nice_level = None, log_file = None):
488 self.nice_level = nice_level
489 self.log_file = log_file
490 self.proc = self.run(cmd)
491
492 def run(self, cmd):
493 if self.nice_level:
494 nice_cmd = ['nice','-n', str(self.nice_level)]
495 nice_cmd.extend(cmd)
496 cmd = nice_cmd
497
498 out_file = None
499 if self.log_file:
500 try:
mblighbb421852008-03-11 22:36:16 +0000501 os.makedirs(os.path.dirname(self.log_file))
mbligh36768f02008-02-22 18:28:33 +0000502 out_file = open(self.log_file, 'a')
503 out_file.write("\n%s\n" % ('*'*80))
504 out_file.write("%s> %s\n" % (time.strftime("%X %x"), cmd))
505 out_file.write("%s\n" % ('*'*80))
506 except:
507 pass
508
509 if not out_file:
510 out_file = open('/dev/null', 'w')
511
512 in_devnull = open('/dev/null', 'r')
513 print "cmd = %s" % cmd
514 print "path = %s" % os.getcwd()
515
516 proc = subprocess.Popen(cmd, stdout=out_file,
517 stderr=subprocess.STDOUT, stdin=in_devnull)
518 out_file.close()
519 in_devnull.close()
520 return proc
521
522
mblighbb421852008-03-11 22:36:16 +0000523 def get_pid(self):
524 return self.proc.pid
525
526
mbligh36768f02008-02-22 18:28:33 +0000527 def kill(self):
mblighbb421852008-03-11 22:36:16 +0000528 kill_autoserv(self.get_pid(), self.exit_code)
529
mbligh36768f02008-02-22 18:28:33 +0000530
531 def exit_code(self):
532 return self.proc.poll()
533
534
mblighbb421852008-03-11 22:36:16 +0000535class PidfileException(Exception):
536 """\
537 Raised when there's some unexpected behavior with the pid file.
538 """
539
540
541class PidfileRunMonitor(RunMonitor):
542 def __init__(self, results_dir, cmd=None, nice_level=None,
543 log_file=None):
544 self.results_dir = os.path.abspath(results_dir)
545 self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
546 self.lost_process = False
547 if cmd is None:
548 # we're reattaching to an existing pid, so don't call
549 # the superconstructor (we don't want to kick off a new
550 # process)
551 pass
552 else:
553 RunMonitor.__init__(self, cmd, nice_level, log_file)
554
555
556 def get_pid(self):
557 pid, exit_status = self.get_pidfile_info()
558 assert pid is not None
559 return pid
560
561
562 def check_proc_fs(self, pid):
563 cmdline_path = os.path.join('/proc', str(pid), 'cmdline')
564 try:
565 cmdline_file = open(cmdline_path, 'r')
566 cmdline = cmdline_file.read().strip()
567 cmdline_file.close()
568 except IOError:
569 return False
570 # /proc/.../cmdline has \x00 separating args
571 results_dir_arg = '\x00-r\x00%s\x00' % self.results_dir
572 if results_dir_arg not in cmdline:
573 print '%s not found in %s' % (repr(results_dir_arg),
574 repr(cmdline))
575 return False
576 return True
577
578
579 def read_pidfile(self):
580 if not os.path.exists(self.pid_file):
581 return None, None
582 file_obj = open(self.pid_file, 'r')
583 lines = file_obj.readlines()
584 file_obj.close()
585 assert 1 <= len(lines) <= 2
586 try:
587 pid = int(lines[0])
588 exit_status = None
589 if len(lines) == 2:
590 exit_status = int(lines[1])
591 except ValueError, exc:
592 raise Exception('Corrupt pid file: ' + str(exc.args))
593
594 return pid, exit_status
595
596
597 def get_pidfile_info(self):
598 """\
599 Returns:
600 None, None if autoserv has not yet run
601 pid, None if autoserv is running
602 pid, exit_status if autoserv has completed
603 """
604 if self.lost_process:
605 return self.pid, self.exit_status
606
607 pid, exit_status = self.read_pidfile()
608
609 # double check autoserv is really running if it says it is
610 if (pid is not None and exit_status is None
611 and not self.check_proc_fs(pid)):
612 # maybe process *just* exited
613 pid, exit_status = self.read_pidfile()
614 if not exit_status:
615 # autoserv exited without writing an exit code
616 # to the pidfile
617 error = ('autoserv died without writing exit '
618 'code')
619 message = error + '\nPid: %s\nPidfile: %s' % (
620 pid, self.pid_file)
621 print message
622 send_notify_email(error, message)
623 self.lost_process = True
624 self.pid = pid
625 self.exit_status = 1
626 return self.pid, self.exit_status
627
628 return pid, exit_status
629
630
631 def exit_code(self):
632 pid, exit_code = self.get_pidfile_info()
633 assert pid is not None
634 return exit_code
635
636
mbligh36768f02008-02-22 18:28:33 +0000637class Agent(object):
mblighd5c95802008-03-05 00:33:46 +0000638 def __init__(self, tasks, queue_entry_ids=[]):
mbligh36768f02008-02-22 18:28:33 +0000639 self.active_task = None
640 self.queue = Queue.Queue(0)
641 self.dispatcher = None
mblighd5c95802008-03-05 00:33:46 +0000642 self.queue_entry_ids = queue_entry_ids
mbligh36768f02008-02-22 18:28:33 +0000643
644 for task in tasks:
645 self.add_task(task)
646
647
648 def add_task(self, task):
649 self.queue.put_nowait(task)
650 task.agent = self
651
652
653 def tick(self):
654 print "agent tick"
655 if self.active_task and not self.active_task.is_done():
656 self.active_task.poll()
657 else:
658 self._next_task();
659
660
661 def _next_task(self):
662 print "agent picking task"
663 if self.active_task:
664 assert self.active_task.is_done()
665
mblighe2586682008-02-29 22:45:46 +0000666 if not self.active_task.success:
667 self.on_task_failure()
668
mbligh36768f02008-02-22 18:28:33 +0000669 self.active_task = None
670 if not self.is_done():
671 self.active_task = self.queue.get_nowait()
672 if self.active_task:
673 self.active_task.start()
674
675
mblighe2586682008-02-29 22:45:46 +0000676 def on_task_failure(self):
mblighe2586682008-02-29 22:45:46 +0000677 self.queue = Queue.Queue(0)
678 for task in self.active_task.failure_tasks:
679 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +0000680
mblighe2586682008-02-29 22:45:46 +0000681
mbligh36768f02008-02-22 18:28:33 +0000682 def is_done(self):
683 return self.active_task == None and self.queue.empty()
684
685
686 def start(self):
687 assert self.dispatcher
688
689 self._next_task()
690
mblighd5c95802008-03-05 00:33:46 +0000691
mbligh36768f02008-02-22 18:28:33 +0000692class AgentTask(object):
mbligh16c722d2008-03-05 00:58:44 +0000693 def __init__(self, cmd, failure_tasks = []):
mbligh36768f02008-02-22 18:28:33 +0000694 self.done = False
695 self.failure_tasks = failure_tasks
696 self.started = False
697 self.cmd = cmd
mblighd5c95802008-03-05 00:33:46 +0000698 self.task = None
mbligh36768f02008-02-22 18:28:33 +0000699 self.agent = None
mblighd5c95802008-03-05 00:33:46 +0000700 self.monitor = None
mbligh36768f02008-02-22 18:28:33 +0000701
702
703 def poll(self):
704 print "poll"
mblighd5c95802008-03-05 00:33:46 +0000705 if self.monitor:
mbligh36768f02008-02-22 18:28:33 +0000706 self.tick(self.monitor.exit_code())
707 else:
708 self.finished(False)
709
710
711 def tick(self, exit_code):
712 if exit_code==None:
713 return
714# print "exit_code was %d" % exit_code
715 if exit_code == 0:
716 success = True
717 else:
718 success = False
719
720 self.finished(success)
721
722
723 def is_done(self):
724 return self.done
725
726
727 def finished(self, success):
728 self.done = True
729 self.success = success
730 self.epilog()
731
732
733 def prolog(self):
734 pass
735
736
737 def epilog(self):
738 pass
739
740
741 def start(self):
742 assert self.agent
743
744 if not self.started:
745 self.prolog()
746 self.run()
747
748 self.started = True
749
750
751 def abort(self):
mblighd5c95802008-03-05 00:33:46 +0000752 if self.monitor:
753 self.monitor.kill()
754 self.done = True
mbligh36768f02008-02-22 18:28:33 +0000755
756
757 def run(self):
758 if self.cmd:
759 print "agent starting monitor"
mbligh36768f02008-02-22 18:28:33 +0000760 log_file = None
761 if hasattr(self, 'host'):
mblighbb421852008-03-11 22:36:16 +0000762 log_file = os.path.join(RESULTS_DIR, 'hosts',
763 self.host.hostname)
764 self.monitor = RunMonitor(
765 self.cmd, nice_level = AUTOSERV_NICE_LEVEL,
766 log_file = log_file)
mbligh36768f02008-02-22 18:28:33 +0000767
768
769class RepairTask(AgentTask):
mbligh16c722d2008-03-05 00:58:44 +0000770 def __init__(self, host, fail_queue_entry=None):
771 """\
772 fail_queue_entry: queue entry to mark failed if this repair
773 fails.
774 """
mbligh48c10a52008-02-29 22:46:38 +0000775 cmd = [_autoserv_path , '-R', '-m', host.hostname]
mbligh36768f02008-02-22 18:28:33 +0000776 self.host = host
mbligh16c722d2008-03-05 00:58:44 +0000777 self.fail_queue_entry = fail_queue_entry
778 AgentTask.__init__(self, cmd)
mblighe2586682008-02-29 22:45:46 +0000779
mbligh36768f02008-02-22 18:28:33 +0000780
781 def prolog(self):
782 print "repair_task starting"
783 self.host.set_status('Repairing')
784
785
786 def epilog(self):
787 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000788 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +0000789 else:
mbligh16c722d2008-03-05 00:58:44 +0000790 self.host.set_status('Repair Failed')
791 if self.fail_queue_entry:
792 self.fail_queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +0000793
794
795class VerifyTask(AgentTask):
796 def __init__(self, queue_entry=None, host=None):
797 assert bool(queue_entry) != bool(host)
798
799 self.host = host or queue_entry.host
800 self.queue_entry = queue_entry
801
802 self.temp_results_dir = tempfile.mkdtemp(suffix='.verify')
mbligh48c10a52008-02-29 22:46:38 +0000803 cmd = [_autoserv_path,'-v','-m',self.host.hostname,
mbligh36768f02008-02-22 18:28:33 +0000804 '-r', self.temp_results_dir]
805
mbligh16c722d2008-03-05 00:58:44 +0000806 fail_queue_entry = None
807 if queue_entry and not queue_entry.meta_host:
808 fail_queue_entry = queue_entry
809 failure_tasks = [RepairTask(self.host, fail_queue_entry)]
mblighe2586682008-02-29 22:45:46 +0000810
mblighdffd6372008-02-29 22:47:33 +0000811 AgentTask.__init__(self, cmd, failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +0000812
813
mbligh36768f02008-02-22 18:28:33 +0000814 def prolog(self):
815 print "starting verify on %s" % (self.host.hostname)
816 if self.queue_entry:
817 self.queue_entry.set_status('Verifying')
mblighdffd6372008-02-29 22:47:33 +0000818 self.queue_entry.clear_results_dir(
819 self.queue_entry.verify_results_dir())
mbligh36768f02008-02-22 18:28:33 +0000820 self.host.set_status('Verifying')
821
822
823 def epilog(self):
824 if self.queue_entry and (self.success or
825 not self.queue_entry.meta_host):
826 self.move_results()
mblighe2586682008-02-29 22:45:46 +0000827 shutil.rmtree(self.temp_results_dir)
mbligh36768f02008-02-22 18:28:33 +0000828
829 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000830 self.host.set_status('Ready')
831 elif self.queue_entry:
mblighdffd6372008-02-29 22:47:33 +0000832 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +0000833
834
835 def move_results(self):
836 assert self.queue_entry is not None
mblighe2586682008-02-29 22:45:46 +0000837 target_dir = self.queue_entry.verify_results_dir()
mbligh36768f02008-02-22 18:28:33 +0000838 if not os.path.exists(target_dir):
839 os.makedirs(target_dir)
840 files = os.listdir(self.temp_results_dir)
841 for filename in files:
mblighbb421852008-03-11 22:36:16 +0000842 if filename == AUTOSERV_PID_FILE:
843 continue
mblighe2586682008-02-29 22:45:46 +0000844 self.force_move(os.path.join(self.temp_results_dir,
845 filename),
846 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +0000847
848
mblighe2586682008-02-29 22:45:46 +0000849 @staticmethod
850 def force_move(source, dest):
851 """\
852 Replacement for shutil.move() that will delete the destination
853 if it exists, even if it's a directory.
854 """
855 if os.path.exists(dest):
856 print ('Warning: removing existing destination file ' +
857 dest)
858 remove_file_or_dir(dest)
859 shutil.move(source, dest)
860
861
mblighdffd6372008-02-29 22:47:33 +0000862class VerifySynchronousTask(VerifyTask):
863 def __init__(self, queue_entry):
864 VerifyTask.__init__(self, queue_entry = queue_entry)
865
866
mbligh16c722d2008-03-05 00:58:44 +0000867 def epilog(self):
868 VerifyTask.epilog(self)
869 if self.success:
870 if self.queue_entry.job.num_complete() > 0:
871 # some other entry failed verify, and we've
872 # already been marked as stopped
873 return
mblighdffd6372008-02-29 22:47:33 +0000874
mbligh16c722d2008-03-05 00:58:44 +0000875 self.queue_entry.set_status('Pending')
876 job = self.queue_entry.job
877 if job.is_ready():
878 agent = job.run(self.queue_entry)
879 self.agent.dispatcher.add_agent(agent)
mblighe2586682008-02-29 22:45:46 +0000880
mbligh36768f02008-02-22 18:28:33 +0000881class QueueTask(AgentTask):
882 def __init__(self, job, queue_entries, cmd):
883 AgentTask.__init__(self, cmd)
884 self.job = job
885 self.queue_entries = queue_entries
886
887
mblighbb421852008-03-11 22:36:16 +0000888 @staticmethod
889 def _write_keyval(results_dir, field, value):
890 key_path = os.path.join(results_dir, 'keyval')
mbligh36768f02008-02-22 18:28:33 +0000891 keyval_file = open(key_path, 'a')
892 print >> keyval_file, '%s=%d' % (field, value)
893 keyval_file.close()
894
895
mblighbb421852008-03-11 22:36:16 +0000896 def results_dir(self):
897 return self.queue_entries[0].results_dir()
898
899
900 def run(self):
901 """\
902 Override AgentTask.run() so we can use a PidfileRunMonitor.
903 """
904 self.monitor = PidfileRunMonitor(self.results_dir(),
905 cmd=self.cmd,
906 nice_level=AUTOSERV_NICE_LEVEL)
907
908
mbligh36768f02008-02-22 18:28:33 +0000909 def prolog(self):
mblighdbdac6c2008-03-05 15:49:58 +0000910 # write the parser commands into the results directories
911 if self.job.is_synchronous() or self.job.num_machines()==1:
912 results_dir = self.job.results_dir()
913 cmdfile = os.path.join(results_dir, '.parse.cmd')
914 cmd = generate_parse_command(results_dir)
915 print >> open(cmdfile, 'w'), cmd
916 else:
917 for queue_entry in self.queue_entries:
918 results_dir = queue_entry.results_dir()
919 cmdfile = os.path.join(results_dir,
920 '.parse.cmd')
921 cmd = generate_parse_command(results_dir,
922 '-l 2')
923 print >> open(cmdfile, 'w'), cmd
924
mblighe2586682008-02-29 22:45:46 +0000925 # write some job timestamps into the job keyval file
926 queued = time.mktime(self.job.created_on.timetuple())
927 started = time.time()
mblighbb421852008-03-11 22:36:16 +0000928 self._write_keyval(self.results_dir(), "job_queued", queued)
929 self._write_keyval(self.results_dir(), "job_started", started)
mbligh36768f02008-02-22 18:28:33 +0000930 for queue_entry in self.queue_entries:
931 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
932 queue_entry.set_status('Running')
933 queue_entry.host.set_status('Running')
mblighe2586682008-02-29 22:45:46 +0000934 if (not self.job.is_synchronous() and
935 self.job.num_machines() > 1):
936 assert len(self.queue_entries) == 1
937 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +0000938
939
940 def epilog(self):
941 if self.success:
942 status = 'Completed'
943 else:
944 status = 'Failed'
945
mblighe2586682008-02-29 22:45:46 +0000946 # write another timestamp into the job keyval file
947 finished = time.time()
mblighbb421852008-03-11 22:36:16 +0000948 self._write_keyval(self.results_dir(), "job_finished", finished)
mbligh36768f02008-02-22 18:28:33 +0000949 for queue_entry in self.queue_entries:
950 queue_entry.set_status(status)
951 queue_entry.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +0000952
953 if self.job.is_synchronous() or self.job.num_machines()==1:
954 if self.job.is_finished():
955 parse_results(self.job.results_dir())
956 else:
957 for queue_entry in self.queue_entries:
mblighbb421852008-03-11 22:36:16 +0000958 parse_results(queue_entry.results_dir(),
959 flags='-l 2')
960
mbligh36768f02008-02-22 18:28:33 +0000961 print "queue_task finished with %s/%s" % (status, self.success)
962
963
mblighbb421852008-03-11 22:36:16 +0000964class RecoveryQueueTask(QueueTask):
965 def __init__(self, job, queue_entries, run_monitor):
966 QueueTask.__init__(self, job, queue_entries, cmd=None)
967 self.run_monitor = run_monitor
968
969
970 def run(self):
971 self.monitor = self.run_monitor
972
973
974 def prolog(self):
975 # recovering an existing process - don't do prolog
976 pass
977
978
mbligh36768f02008-02-22 18:28:33 +0000979class RebootTask(AgentTask):
mblighd5c95802008-03-05 00:33:46 +0000980 def __init__(self, host):
981 global _autoserv_path
982
983 # Current implementation of autoserv requires control file
984 # to be passed on reboot action request. TODO: remove when no
985 # longer appropriate.
986 self.cmd = [_autoserv_path, '-b', '-m', host.hostname,
987 '/dev/null']
mbligh36768f02008-02-22 18:28:33 +0000988 self.host = host
mblighd5c95802008-03-05 00:33:46 +0000989 AgentTask.__init__(self, self.cmd,
mbligh16c722d2008-03-05 00:58:44 +0000990 failure_tasks=[RepairTask(host)])
991
mblighd5c95802008-03-05 00:33:46 +0000992
993 def prolog(self):
994 print "starting reboot task for host: %s" % self.host.hostname
995 self.host.set_status("Rebooting")
996
mblighd5c95802008-03-05 00:33:46 +0000997
998class AbortTask(AgentTask):
999 def __init__(self, queue_entry, agents_to_abort):
1000 self.queue_entry = queue_entry
1001 self.agents_to_abort = agents_to_abort
1002 for agent in agents_to_abort:
1003 agent.dispatcher.remove_agent(agent)
1004 AgentTask.__init__(self, '')
mbligh36768f02008-02-22 18:28:33 +00001005
1006
mblighd5c95802008-03-05 00:33:46 +00001007 def prolog(self):
1008 print "starting abort on host %s, job %s" % (
1009 self.queue_entry.host_id, self.queue_entry.job_id)
1010 self.queue_entry.set_status('Aborting')
1011
mbligh36768f02008-02-22 18:28:33 +00001012
mblighd5c95802008-03-05 00:33:46 +00001013 def epilog(self):
1014 self.queue_entry.set_status('Aborted')
1015 self.success = True
mbligh36768f02008-02-22 18:28:33 +00001016
1017 def run(self):
mblighd5c95802008-03-05 00:33:46 +00001018 for agent in self.agents_to_abort:
1019 if (agent.active_task):
1020 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001021
1022
1023class DBObject(object):
mblighe2586682008-02-29 22:45:46 +00001024 def __init__(self, fields, id=None, row=None, new_record=False):
1025 assert (bool(id) != bool(row)) and fields
mbligh36768f02008-02-22 18:28:33 +00001026
mblighe2586682008-02-29 22:45:46 +00001027 self.__table = self._get_table()
mbligh36768f02008-02-22 18:28:33 +00001028 self.__fields = fields
1029
1030 self.__new_record = new_record
1031
1032 if row is None:
1033 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
mbligh6f8bab42008-02-29 22:45:14 +00001034 rows = _db.execute(sql, (id,))
1035 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001036 raise "row not found (table=%s, id=%s)" % \
1037 (self.__table, id)
mbligh6f8bab42008-02-29 22:45:14 +00001038 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001039
mblighe2586682008-02-29 22:45:46 +00001040 assert len(row)==len(fields), (
1041 "table = %s, row = %s/%d, fields = %s/%d" % (
1042 self.__table, row, len(row), fields, len(fields)))
mbligh36768f02008-02-22 18:28:33 +00001043
1044 self.__valid_fields = {}
1045 for i,value in enumerate(row):
1046 self.__dict__[fields[i]] = value
1047 self.__valid_fields[fields[i]] = True
1048
1049 del self.__valid_fields['id']
1050
mblighe2586682008-02-29 22:45:46 +00001051
1052 @classmethod
1053 def _get_table(cls):
1054 raise NotImplementedError('Subclasses must override this')
1055
1056
mbligh36768f02008-02-22 18:28:33 +00001057 def count(self, where, table = None):
1058 if not table:
1059 table = self.__table
mbligh4314a712008-02-29 22:44:30 +00001060
mbligh6f8bab42008-02-29 22:45:14 +00001061 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001062 SELECT count(*) FROM %s
1063 WHERE %s
1064 """ % (table, where))
mbligh36768f02008-02-22 18:28:33 +00001065
mbligh6f8bab42008-02-29 22:45:14 +00001066 assert len(rows) == 1
1067
1068 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001069
1070
1071 def num_cols(self):
1072 return len(self.__fields)
1073
1074
1075 def update_field(self, field, value):
1076 assert self.__valid_fields[field]
1077
1078 if self.__dict__[field] == value:
1079 return
1080
1081 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
1082 (self.__table, field)
mbligh6f8bab42008-02-29 22:45:14 +00001083 _db.execute(query, (value, self.id))
mbligh36768f02008-02-22 18:28:33 +00001084
1085 self.__dict__[field] = value
1086
1087
1088 def save(self):
1089 if self.__new_record:
1090 keys = self.__fields[1:] # avoid id
1091 columns = ','.join([str(key) for key in keys])
1092 values = ['"%s"' % self.__dict__[key] for key in keys]
1093 values = ','.join(values)
1094 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1095 (self.__table, columns, values)
mbligh6f8bab42008-02-29 22:45:14 +00001096 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001097
1098
mblighe2586682008-02-29 22:45:46 +00001099 def delete(self):
1100 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1101 _db.execute(query, (self.id,))
1102
1103
1104 @classmethod
1105 def fetch(cls, where):
1106 rows = _db.execute(
1107 'SELECT * FROM %s WHERE %s' % (cls._get_table(), where))
1108 for row in rows:
1109 yield cls(row=row)
1110
mbligh36768f02008-02-22 18:28:33 +00001111
1112class IneligibleHostQueue(DBObject):
1113 def __init__(self, id=None, row=None, new_record=None):
1114 fields = ['id', 'job_id', 'host_id']
mblighe2586682008-02-29 22:45:46 +00001115 DBObject.__init__(self, fields, id=id, row=row,
1116 new_record=new_record)
1117
1118
1119 @classmethod
1120 def _get_table(cls):
1121 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001122
1123
1124class Host(DBObject):
1125 def __init__(self, id=None, row=None):
1126 fields = ['id', 'hostname', 'locked', 'synch_id','status']
mblighe2586682008-02-29 22:45:46 +00001127 DBObject.__init__(self, fields, id=id, row=row)
1128
1129
1130 @classmethod
1131 def _get_table(cls):
1132 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001133
1134
1135 def current_task(self):
mbligh6f8bab42008-02-29 22:45:14 +00001136 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001137 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1138 """, (self.id,))
1139
mbligh6f8bab42008-02-29 22:45:14 +00001140 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001141 return None
1142 else:
mbligh6f8bab42008-02-29 22:45:14 +00001143 assert len(rows) == 1
1144 results = rows[0];
mbligh36768f02008-02-22 18:28:33 +00001145# print "current = %s" % results
1146 return HostQueueEntry(row=results)
1147
1148
1149 def next_queue_entries(self):
1150 if self.locked:
1151 print "%s locked, not queuing" % self.hostname
1152 return None
1153# print "%s/%s looking for work" % (self.hostname, self.platform_id)
mbligh6f8bab42008-02-29 22:45:14 +00001154 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001155 SELECT * FROM host_queue_entries
1156 WHERE ((host_id=%s) OR (meta_host IS NOT null AND
1157 (meta_host IN (
1158 SELECT label_id FROM hosts_labels WHERE host_id=%s
1159 )
1160 )
1161 AND job_id NOT IN (
1162 SELECT job_id FROM ineligible_host_queues
1163 WHERE host_id=%s
1164 )))
1165 AND NOT complete AND NOT active
1166 ORDER BY priority DESC, meta_host, id
1167 LIMIT 1
1168 """, (self.id,self.id, self.id))
1169
mbligh6f8bab42008-02-29 22:45:14 +00001170 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001171 return None
1172 else:
mbligh6f8bab42008-02-29 22:45:14 +00001173 return [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001174
1175 def yield_work(self):
1176 print "%s yielding work" % self.hostname
1177 if self.current_task():
1178 self.current_task().requeue()
1179
1180 def set_status(self,status):
mblighbb421852008-03-11 22:36:16 +00001181 print '%s -> %s' % (self.hostname, status)
mbligh36768f02008-02-22 18:28:33 +00001182 self.update_field('status',status)
1183
1184
1185class HostQueueEntry(DBObject):
1186 def __init__(self, id=None, row=None):
1187 assert id or row
1188 fields = ['id', 'job_id', 'host_id', 'priority', 'status',
1189 'meta_host', 'active', 'complete']
mblighe2586682008-02-29 22:45:46 +00001190 DBObject.__init__(self, fields, id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001191
1192 self.job = Job(self.job_id)
1193
1194 if self.host_id:
1195 self.host = Host(self.host_id)
1196 else:
1197 self.host = None
1198
1199 self.queue_log_path = os.path.join(self.job.results_dir(),
1200 'queue.log.' + str(self.id))
1201
1202
mblighe2586682008-02-29 22:45:46 +00001203 @classmethod
1204 def _get_table(cls):
1205 return 'host_queue_entries'
1206
1207
mbligh36768f02008-02-22 18:28:33 +00001208 def set_host(self, host):
1209 if host:
1210 self.queue_log_record('Assigning host ' + host.hostname)
1211 self.update_field('host_id', host.id)
1212 self.update_field('active', True)
mblighe2586682008-02-29 22:45:46 +00001213 self.block_host(host.id)
mbligh36768f02008-02-22 18:28:33 +00001214 else:
1215 self.queue_log_record('Releasing host')
mblighe2586682008-02-29 22:45:46 +00001216 self.unblock_host(self.host.id)
mbligh36768f02008-02-22 18:28:33 +00001217 self.update_field('host_id', None)
1218
1219 self.host = host
1220
1221
1222 def get_host(self):
mblighe2586682008-02-29 22:45:46 +00001223 return self.host
mbligh36768f02008-02-22 18:28:33 +00001224
1225
1226 def queue_log_record(self, log_line):
mblighe2586682008-02-29 22:45:46 +00001227 now = str(datetime.datetime.now())
mbligh36768f02008-02-22 18:28:33 +00001228 queue_log = open(self.queue_log_path, 'a', 0)
mblighe2586682008-02-29 22:45:46 +00001229 queue_log.write(now + ' ' + log_line + '\n')
mbligh36768f02008-02-22 18:28:33 +00001230 queue_log.close()
1231
1232
mblighe2586682008-02-29 22:45:46 +00001233 def block_host(self, host_id):
1234 print "creating block %s/%s" % (self.job.id, host_id)
1235 row = [0, self.job.id, host_id]
1236 block = IneligibleHostQueue(row=row, new_record=True)
1237 block.save()
1238
1239
1240 def unblock_host(self, host_id):
1241 print "removing block %s/%s" % (self.job.id, host_id)
1242 blocks = list(IneligibleHostQueue.fetch(
1243 'job_id=%d and host_id=%d' % (self.job.id, host_id)))
1244 assert len(blocks) == 1
1245 blocks[0].delete()
1246
1247
mbligh36768f02008-02-22 18:28:33 +00001248 def results_dir(self):
mblighe2586682008-02-29 22:45:46 +00001249 if self.job.is_synchronous() or self.job.num_machines() == 1:
1250 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001251 else:
1252 assert self.host
mblighe2586682008-02-29 22:45:46 +00001253 return os.path.join(self.job.job_dir,
1254 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +00001255
mblighe2586682008-02-29 22:45:46 +00001256
1257 def verify_results_dir(self):
1258 if self.job.is_synchronous() or self.job.num_machines() > 1:
1259 assert self.host
1260 return os.path.join(self.job.job_dir,
1261 self.host.hostname)
1262 else:
1263 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001264
1265
1266 def set_status(self, status):
1267 self.update_field('status', status)
1268 if self.host:
1269 hostname = self.host.hostname
1270 else:
1271 hostname = 'no host'
1272 print "%s/%d status -> %s" % (hostname, self.id, self.status)
1273 if status in ['Queued']:
1274 self.update_field('complete', False)
1275 self.update_field('active', False)
1276
mblighd5c95802008-03-05 00:33:46 +00001277 if status in ['Pending', 'Running', 'Verifying', 'Starting',
1278 'Abort', 'Aborting']:
mbligh36768f02008-02-22 18:28:33 +00001279 self.update_field('complete', False)
1280 self.update_field('active', True)
1281
mblighd5c95802008-03-05 00:33:46 +00001282 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
mbligh36768f02008-02-22 18:28:33 +00001283 self.update_field('complete', True)
1284 self.update_field('active', False)
1285
1286
1287 def run(self,assigned_host=None):
1288 if self.meta_host:
1289 assert assigned_host
mblighe2586682008-02-29 22:45:46 +00001290 # ensure results dir exists for the queue log
mbligh36768f02008-02-22 18:28:33 +00001291 self.job.create_results_dir()
1292 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001293
mbligh36768f02008-02-22 18:28:33 +00001294 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1295 self.meta_host, self.host.hostname, self.status)
1296
1297 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001298
mbligh36768f02008-02-22 18:28:33 +00001299 def requeue(self):
1300 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001301
mbligh36768f02008-02-22 18:28:33 +00001302 if self.meta_host:
1303 self.set_host(None)
1304
1305
mblighe2586682008-02-29 22:45:46 +00001306 def handle_host_failure(self):
1307 """\
1308 Called when this queue entry's host has failed verification and
1309 repair.
1310 """
mblighdffd6372008-02-29 22:47:33 +00001311 assert not self.meta_host
1312 self.set_status('Failed')
1313 if self.job.is_synchronous():
1314 self.job.stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00001315
1316
1317 def clear_results_dir(self, results_dir=None):
1318 results_dir = results_dir or self.results_dir()
1319 if not os.path.exists(results_dir):
1320 return
1321 for filename in os.listdir(results_dir):
1322 if 'queue.log' in filename:
1323 continue
1324 path = os.path.join(results_dir, filename)
1325 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00001326
1327
1328class Job(DBObject):
1329 def __init__(self, id=None, row=None):
1330 assert id or row
mblighe2586682008-02-29 22:45:46 +00001331 DBObject.__init__(self,
1332 ['id','owner','name','priority',
1333 'control_file','control_type','created_on',
1334 'synch_type', 'synch_count','synchronizing'],
1335 id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001336
mblighe2586682008-02-29 22:45:46 +00001337 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
1338 self.owner))
1339
1340
1341 @classmethod
1342 def _get_table(cls):
1343 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001344
1345
1346 def is_server_job(self):
1347 return self.control_type != 2
1348
1349
1350 def get_host_queue_entries(self):
mbligh6f8bab42008-02-29 22:45:14 +00001351 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001352 SELECT * FROM host_queue_entries
1353 WHERE job_id= %s
1354 """, (self.id,))
mbligh6f8bab42008-02-29 22:45:14 +00001355 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001356
1357 assert len(entries)>0
1358
1359 return entries
1360
1361
1362 def set_status(self, status, update_queues=False):
1363 self.update_field('status',status)
1364
1365 if update_queues:
1366 for queue_entry in self.get_host_queue_entries():
1367 queue_entry.set_status(status)
1368
1369
1370 def is_synchronous(self):
1371 return self.synch_type == 2
1372
1373
1374 def is_ready(self):
1375 if not self.is_synchronous():
1376 return True
1377 sql = "job_id=%s AND status='Pending'" % self.id
1378 count = self.count(sql, table='host_queue_entries')
1379 return (count == self.synch_count)
1380
1381
1382 def ready_to_synchronize(self):
1383 # heuristic
1384 queue_entries = self.get_host_queue_entries()
1385 count = 0
1386 for queue_entry in queue_entries:
1387 if queue_entry.status == 'Pending':
1388 count += 1
1389
1390 return (count/self.synch_count >= 0.5)
1391
1392
1393 def start_synchronizing(self):
1394 self.update_field('synchronizing', True)
1395
1396
1397 def results_dir(self):
1398 return self.job_dir
1399
1400 def num_machines(self, clause = None):
1401 sql = "job_id=%s" % self.id
1402 if clause:
1403 sql += " AND (%s)" % clause
1404 return self.count(sql, table='host_queue_entries')
1405
1406
1407 def num_queued(self):
1408 return self.num_machines('not complete')
1409
1410
1411 def num_active(self):
1412 return self.num_machines('active')
1413
1414
1415 def num_complete(self):
1416 return self.num_machines('complete')
1417
1418
1419 def is_finished(self):
1420 left = self.num_queued()
1421 print "%s: %s machines left" % (self.name, left)
1422 return left==0
1423
1424 def stop_synchronizing(self):
1425 self.update_field('synchronizing', False)
1426 self.set_status('Queued', update_queues = False)
1427
1428
mblighe2586682008-02-29 22:45:46 +00001429 def stop_all_entries(self):
1430 for child_entry in self.get_host_queue_entries():
1431 if not child_entry.complete:
1432 child_entry.set_status('Stopped')
1433
1434
1435 def write_to_machines_file(self, queue_entry):
1436 hostname = queue_entry.get_host().hostname
1437 print "writing %s to job %s machines file" % (hostname, self.id)
1438 file_path = os.path.join(self.job_dir, '.machines')
1439 mf = open(file_path, 'a')
1440 mf.write("%s\n" % queue_entry.get_host().hostname)
1441 mf.close()
mbligh36768f02008-02-22 18:28:33 +00001442
1443
1444 def create_results_dir(self, queue_entry=None):
1445 print "create: active: %s complete %s" % (self.num_active(),
1446 self.num_complete())
1447
1448 if not os.path.exists(self.job_dir):
1449 os.makedirs(self.job_dir)
1450
1451 if queue_entry:
1452 return queue_entry.results_dir()
1453 return self.job_dir
1454
1455
1456 def run(self, queue_entry):
1457 results_dir = self.create_results_dir(queue_entry)
1458
1459 if self.is_synchronous():
1460 if not self.is_ready():
mblighd5c95802008-03-05 00:33:46 +00001461 return Agent([VerifySynchronousTask(
1462 queue_entry = queue_entry)],
1463 [queue_entry.id])
mbligh36768f02008-02-22 18:28:33 +00001464
1465 queue_entry.set_status('Starting')
1466
1467 ctrl = open(os.tmpnam(), 'w')
1468 if self.control_file:
1469 ctrl.write(self.control_file)
1470 else:
1471 ctrl.write("")
1472 ctrl.flush()
1473
1474 if self.is_synchronous():
mbligh36768f02008-02-22 18:28:33 +00001475 queue_entries = self.get_host_queue_entries()
1476 else:
1477 assert queue_entry
mbligh36768f02008-02-22 18:28:33 +00001478 queue_entries = [queue_entry]
mblighe2586682008-02-29 22:45:46 +00001479 hostnames = ','.join([entry.get_host().hostname
1480 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00001481
mblighbb421852008-03-11 22:36:16 +00001482 params = [_autoserv_path, '-p', '-n',
1483 '-r', os.path.abspath(results_dir),
1484 '-b', '-u', self.owner, '-l', self.name,
1485 '-m', hostnames, ctrl.name]
mbligh36768f02008-02-22 18:28:33 +00001486
1487 if not self.is_server_job():
1488 params.append('-c')
1489
1490 tasks = []
1491 if not self.is_synchronous():
1492 tasks.append(VerifyTask(queue_entry))
mblighe2586682008-02-29 22:45:46 +00001493
1494 tasks.append(QueueTask(job = self,
1495 queue_entries = queue_entries,
1496 cmd = params))
mbligh36768f02008-02-22 18:28:33 +00001497
mblighd5c95802008-03-05 00:33:46 +00001498 ids = []
1499 for entry in queue_entries:
1500 ids.append(entry.id)
1501
1502 agent = Agent(tasks, ids)
mbligh36768f02008-02-22 18:28:33 +00001503
1504 return agent
1505
1506
1507if __name__ == '__main__':
1508 main()