blob: f8c2190b52670fed664527d586218f4fd9d4d7cd [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
mblighbb421852008-03-11 22:36:16 +00009import optparse, signal, smtplib, socket, datetime, stat, pwd
mblighb090f142008-02-27 21:33:46 +000010from common import global_config
11
mbligh36768f02008-02-22 18:28:33 +000012RESULTS_DIR = '.'
13AUTOSERV_NICE_LEVEL = 10
14
15AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
16
17if os.environ.has_key('AUTOTEST_DIR'):
18 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
19AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
20AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
21
22if AUTOTEST_SERVER_DIR not in sys.path:
23 sys.path.insert(0, AUTOTEST_SERVER_DIR)
24
mblighbb421852008-03-11 22:36:16 +000025AUTOSERV_PID_FILE = '.autoserv_execute'
26
mbligh6f8bab42008-02-29 22:45:14 +000027_db = None
mbligh36768f02008-02-22 18:28:33 +000028_shutdown = False
29_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000030_autoserv_path = 'autoserv'
31_testing_mode = False
mbligh36768f02008-02-22 18:28:33 +000032
33
34def main():
35 usage = 'usage: %prog [options] results_dir'
36
37 parser = optparse.OptionParser(usage)
mblighbb421852008-03-11 22:36:16 +000038 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
mbligh36768f02008-02-22 18:28:33 +000039 action='store_true')
40 parser.add_option('--logfile', help='Set a log file that all stdout ' +
41 'should be redirected to. Stderr will go to this ' +
42 'file + ".err"')
43 parser.add_option('--notify', help='Set an email address to be ' +
44 'notified of exceptions')
mbligh4314a712008-02-29 22:44:30 +000045 parser.add_option('--test', help='Indicate that scheduler is under ' +
46 'test and should use dummy autoserv and no parsing',
47 action='store_true')
mbligh36768f02008-02-22 18:28:33 +000048 (options, args) = parser.parse_args()
49 if len(args) != 1:
50 parser.print_usage()
51 return
52
53 global RESULTS_DIR
54 RESULTS_DIR = args[0]
55
56 global _notify_email
57 _notify_email = options.notify
mbligh4314a712008-02-29 22:44:30 +000058
59 if options.test:
60 global _autoserv_path
61 _autoserv_path = 'autoserv_dummy'
62 global _testing_mode
63 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000064
65 init(options.logfile)
mblighbb421852008-03-11 22:36:16 +000066 dispatcher = Dispatcher()
67 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
mbligh36768f02008-02-22 18:28:33 +000068
69 try:
70 while not _shutdown:
71 dispatcher.tick()
72 time.sleep(20)
mbligh36768f02008-02-22 18:28:33 +000073 except:
74 log_stacktrace("Uncaught exception; terminating monitor_db")
75
mbligh6f8bab42008-02-29 22:45:14 +000076 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +000077
78
79def handle_sigint(signum, frame):
80 global _shutdown
81 _shutdown = True
82 print "Shutdown request received."
83
84
85def init(logfile):
86 if logfile:
87 enable_logging(logfile)
88 print "%s> dispatcher starting" % time.strftime("%X %x")
89 print "My PID is %d" % os.getpid()
90
91 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
mbligh6f8bab42008-02-29 22:45:14 +000092 global _db
93 _db = DatabaseConn()
mbligh36768f02008-02-22 18:28:33 +000094
95 print "Setting signal handler"
96 signal.signal(signal.SIGINT, handle_sigint)
97
98 print "Connected! Running..."
99
100
101def enable_logging(logfile):
102 out_file = logfile
103 err_file = "%s.err" % logfile
104 print "Enabling logging to %s (%s)" % (out_file, err_file)
105 out_fd = open(out_file, "a", buffering=0)
106 err_fd = open(err_file, "a", buffering=0)
107
108 os.dup2(out_fd.fileno(), sys.stdout.fileno())
109 os.dup2(err_fd.fileno(), sys.stderr.fileno())
110
111 sys.stdout = out_fd
112 sys.stderr = err_fd
113
114
115def idle_hosts():
mbligh6f8bab42008-02-29 22:45:14 +0000116 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000117 SELECT * FROM hosts h WHERE
118 id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
119 (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
120 OR
121 (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
122 INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
123 )
124 AND locked=false AND (h.status IS null OR h.status='Ready') """)
mbligh6f8bab42008-02-29 22:45:14 +0000125 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000126 return hosts
127
mblighd5c95802008-03-05 00:33:46 +0000128def queue_entries_to_abort():
129 rows = _db.execute("""
130 SELECT * FROM host_queue_entries WHERE status='Abort';
131 """)
132 qe = [HostQueueEntry(row=i) for i in rows]
133 return qe
mbligh36768f02008-02-22 18:28:33 +0000134
mblighe2586682008-02-29 22:45:46 +0000135def remove_file_or_dir(path):
136 if stat.S_ISDIR(os.stat(path).st_mode):
137 # directory
138 shutil.rmtree(path)
139 else:
140 # file
141 os.remove(path)
142
143
mbligh6f8bab42008-02-29 22:45:14 +0000144class DatabaseConn:
145 def __init__(self):
146 self.reconnect_wait = 20
147 self.conn = None
148 self.cur = None
mbligh36768f02008-02-22 18:28:33 +0000149
mbligh4eb2df22008-03-13 15:39:29 +0000150 import MySQLdb.converters
151 self.convert_dict = MySQLdb.converters.conversions
152 self.convert_dict.setdefault(bool, self.convert_boolean)
153
mbligh6f8bab42008-02-29 22:45:14 +0000154 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000155
156
mbligh4eb2df22008-03-13 15:39:29 +0000157 @staticmethod
158 def convert_boolean(boolean, conversion_dict):
159 'Convert booleans to integer strings'
160 return str(int(boolean))
161
162
mbligh6f8bab42008-02-29 22:45:14 +0000163 def connect(self):
164 self.disconnect()
165
166 # get global config and parse for info
167 c = global_config.global_config
168 dbase = "AUTOTEST_WEB"
mbligh104e9ce2008-03-11 22:01:44 +0000169 DB_HOST = c.get_config_value(dbase, "host")
170 DB_SCHEMA = c.get_config_value(dbase, "database")
mbligh6f8bab42008-02-29 22:45:14 +0000171
172 global _testing_mode
173 if _testing_mode:
174 DB_SCHEMA = 'stresstest_autotest_web'
175
mbligh104e9ce2008-03-11 22:01:44 +0000176 DB_USER = c.get_config_value(dbase, "user")
177 DB_PASS = c.get_config_value(dbase, "password")
mbligh6f8bab42008-02-29 22:45:14 +0000178
179 while not self.conn:
180 try:
mbligh4eb2df22008-03-13 15:39:29 +0000181 self.conn = MySQLdb.connect(
182 host=DB_HOST, user=DB_USER, passwd=DB_PASS,
183 db=DB_SCHEMA, conv=self.convert_dict)
mbligh6f8bab42008-02-29 22:45:14 +0000184
185 self.conn.autocommit(True)
186 self.cur = self.conn.cursor()
187 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000188 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000189 print "Can't connect to MYSQL; reconnecting"
190 time.sleep(self.reconnect_wait)
191 self.disconnect()
192
193
194 def disconnect(self):
195 if self.conn:
196 self.conn.close()
197 self.conn = None
198 self.cur = None
199
200
201 def execute(self, *args, **dargs):
202 while (True):
203 try:
204 self.cur.execute(*args, **dargs)
205 return self.cur.fetchall()
206 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000207 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000208 print "MYSQL connection died; reconnecting"
209 time.sleep(self.reconnect_wait)
210 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000211
212
mblighdbdac6c2008-03-05 15:49:58 +0000213def generate_parse_command(results_dir, flags=""):
214 parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
215 output = os.path.abspath(os.path.join(results_dir, '.parse.log'))
216 cmd = "%s %s -r -o %s > %s 2>&1 &"
217 return cmd % (parse, flags, results_dir, output)
218
219
mbligh36768f02008-02-22 18:28:33 +0000220def parse_results(results_dir, flags=""):
mbligh4314a712008-02-29 22:44:30 +0000221 if _testing_mode:
222 return
mblighdbdac6c2008-03-05 15:49:58 +0000223 os.system(generate_parse_command(results_dir, flags))
mbligh36768f02008-02-22 18:28:33 +0000224
225
mblighbb421852008-03-11 22:36:16 +0000226def send_notify_email(subject, message):
227 if not _notify_email:
228 return
229
230 message = "%s / %s / %s\n%s" % (socket.gethostname(), os.getpid(),
231 time.strftime("%X %x"), message)
232 sender = pwd.getpwuid(os.getuid())[0] # see os.getlogin() online docs
233 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
234 sender, _notify_email, subject, message)
235 mailer = smtplib.SMTP('localhost')
236 mailer.sendmail(sender, _notify_email, msg)
237 mailer.quit()
238
239
mbligh36768f02008-02-22 18:28:33 +0000240def log_stacktrace(reason):
241 (type, value, tb) = sys.exc_info()
242 str = "EXCEPTION: %s\n" % reason
mbligh36768f02008-02-22 18:28:33 +0000243 str += ''.join(traceback.format_exception(type, value, tb))
244
245 sys.stderr.write("\n%s\n" % str)
mblighbb421852008-03-11 22:36:16 +0000246 send_notify_email("monitor_db exception", str)
mbligh36768f02008-02-22 18:28:33 +0000247
mblighbb421852008-03-11 22:36:16 +0000248
249def get_proc_poll_fn(pid):
250 proc_path = os.path.join('/proc', str(pid))
251 def poll_fn():
252 if os.path.exists(proc_path):
253 return None
254 return 0 # we can't get a real exit code
255 return poll_fn
256
257
258def kill_autoserv(pid, poll_fn=None):
259 print 'killing', pid
260 if poll_fn is None:
261 poll_fn = get_proc_poll_fn(pid)
262 if poll_fn() == None:
263 os.kill(pid, signal.SIGCONT)
264 os.kill(pid, signal.SIGTERM)
mbligh36768f02008-02-22 18:28:33 +0000265
266
267class Dispatcher:
mblighbb421852008-03-11 22:36:16 +0000268 def __init__(self):
mbligh36768f02008-02-22 18:28:33 +0000269 self._agents = []
mbligh36768f02008-02-22 18:28:33 +0000270
mbligh36768f02008-02-22 18:28:33 +0000271
mblighbb421852008-03-11 22:36:16 +0000272 def do_initial_recovery(self, recover_hosts=True):
273 # always recover processes
274 self._recover_processes()
275
276 if recover_hosts:
277 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000278
279
280 def tick(self):
mblighbb421852008-03-11 22:36:16 +0000281 self._find_aborting()
282 self._find_more_work()
mbligh36768f02008-02-22 18:28:33 +0000283 self._handle_agents()
284
285
286 def add_agent(self, agent):
287 self._agents.append(agent)
288 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000289
290 # Find agent corresponding to the specified queue_entry
291 def get_agents(self, queue_entry):
292 res_agents = []
293 for agent in self._agents:
294 if queue_entry.id in agent.queue_entry_ids:
295 res_agents.append(agent)
296 return res_agents
297
298
299 def remove_agent(self, agent):
300 self._agents.remove(agent)
mbligh36768f02008-02-22 18:28:33 +0000301
302
mblighbb421852008-03-11 22:36:16 +0000303 def find_orphaned_autoservs(self):
304 """\
305 Returns a dict mapping pids to command lines for root autoserv
306 processes that have been orphaned.
307 """
308 proc = subprocess.Popen(
309 ['/bin/ps', 'x', '-o', 'pid,ppid,comm,args'],
310 stdout=subprocess.PIPE)
311 # split each line into the four columns output by ps
312 procs = [line.split(None, 3) for line in
313 proc.communicate()[0].splitlines()]
314 autoserv_procs = [(int(proc[0]), proc[3]) # pid, args
315 for proc in procs
316 if proc[2] == 'autoserv' # comm
317 and proc[1] == '1'] # ppid
318 return dict(autoserv_procs)
319
320
321 def recover_queue_entry(self, queue_entry, run_monitor):
322 job = queue_entry.job
323 if job.is_synchronous():
324 all_queue_entries = job.get_host_queue_entries()
325 else:
326 all_queue_entries = [queue_entry]
327 all_queue_entry_ids = [queue_entry.id for queue_entry
328 in all_queue_entries]
329 queue_task = RecoveryQueueTask(
330 job=queue_entry.job,
331 queue_entries=all_queue_entries,
332 run_monitor=run_monitor)
333 self.add_agent(Agent(tasks=[queue_task],
334 queue_entry_ids=all_queue_entry_ids))
335
336
337 def _recover_processes(self):
338 orphans = self.find_orphaned_autoservs()
339
340 # first, recover running queue entries
341 rows = _db.execute("""SELECT * FROM host_queue_entries
342 WHERE status = 'Running'""")
343 queue_entries = [HostQueueEntry(row=i) for i in rows]
344 requeue_entries = []
345 recovered_entry_ids = set()
346 for queue_entry in queue_entries:
347 run_monitor = PidfileRunMonitor(
348 queue_entry.results_dir())
349 pid, exit_code = run_monitor.get_pidfile_info()
350 if pid is None:
351 # autoserv apparently never got run, so requeue
352 requeue_entries.append(queue_entry)
353 continue
354 if queue_entry.id in recovered_entry_ids:
355 # synchronous job we've already recovered
356 continue
357 print 'Recovering queue entry %d (pid %d)' % (
358 queue_entry.id, pid)
359 job = queue_entry.job
360 if job.is_synchronous():
361 for entry in job.get_host_queue_entries():
362 assert entry.active
363 recovered_entry_ids.add(entry.id)
364 self.recover_queue_entry(queue_entry,
365 run_monitor)
366 orphans.pop(pid, None)
367
368 # and requeue other active queue entries
369 rows = _db.execute("""SELECT * FROM host_queue_entries
370 WHERE active AND NOT complete
371 AND status != 'Running'
372 AND status != 'Pending'
373 AND status != 'Abort'
374 AND status != 'Aborting'""")
375 queue_entries = [HostQueueEntry(row=i) for i in rows]
376 for queue_entry in queue_entries + requeue_entries:
377 print 'Requeuing running QE %d' % queue_entry.id
378 queue_entry.clear_results_dir()
379 queue_entry.requeue()
380
381
382 # now kill any remaining autoserv processes
383 for pid in orphans.keys():
384 print 'Killing orphan %d (%s)' % (pid, orphans[pid])
385 kill_autoserv(pid)
386
387 # recover aborting tasks
mblighd5c95802008-03-05 00:33:46 +0000388 rows = _db.execute("""SELECT * FROM host_queue_entries
389 WHERE status='Abort' or status='Aborting'""")
mblighbb421852008-03-11 22:36:16 +0000390 queue_entries = [HostQueueEntry(row=i) for i in rows]
391 for queue_entry in queue_entries:
392 queue_host = queue_entry.get_host()
393 reboot_task = RebootTask(queue_host)
394 verify_task = VerifyTask(host = queue_host)
395 self.add_agent(Agent(tasks=[reboot_task,
396 verify_task],
397 queue_entry_ids=[queue_entry.id]))
398 queue_entry.set_status('Aborted')
399 # Secure the host from being picked up
400 queue_host.set_status('Rebooting')
mblighd5c95802008-03-05 00:33:46 +0000401
mblighbb421852008-03-11 22:36:16 +0000402 # reverify hosts that were in the middle of verify, repair or
403 # reboot
mbligh6f8bab42008-02-29 22:45:14 +0000404 rows = _db.execute("""SELECT * FROM hosts
mblighbb421852008-03-11 22:36:16 +0000405 WHERE locked = 0 AND
406 (status = 'Repairing'
407 OR status = 'Verifying'
408 OR status = 'Rebooting')""")
409 hosts = [Host(row=i) for i in rows]
410 for host in hosts:
411 self.add_agent(Agent(tasks=[VerifyTask(host=host)]))
412
413
414 def _recover_hosts(self):
415 # look for both "Repair Failed" hosts, which we expect, and
416 # "Running" hosts with no active queue entries, which should
417 # never happen
418 rows = _db.execute(
419 """SELECT * FROM hosts WHERE locked = 0 AND
420 (status = 'Repair Failed'
421 OR (status = 'Running' AND
422 id NOT IN (SELECT host_id FROM host_queue_entries
423 WHERE active)))""")
424 hosts = [Host(row=i) for i in rows]
425 for host in hosts:
426 if host.status == 'Running':
427 print ('Recovering running host %s - this '
428 'probably indicates a scheduler bug' %
429 host.hostname)
430 verify_task = VerifyTask(host = host)
431 self.add_agent(Agent(tasks = [verify_task]))
mbligh36768f02008-02-22 18:28:33 +0000432
433
434 def _find_more_work(self):
435 print "finding work"
436
437 num_started = 0
438 for host in idle_hosts():
439 tasks = host.next_queue_entries()
440 if tasks:
441 for next in tasks:
442 try:
443 agent = next.run(assigned_host=host)
444 if agent:
445 self.add_agent(agent)
446
447 num_started += 1
448 if num_started>=100:
449 return
450 break
451 except:
452 next.set_status('Failed')
453
454# if next.host:
455# next.host.set_status('Ready')
456
457 log_stacktrace("task_id = %d" % next.id)
458
459
mblighd5c95802008-03-05 00:33:46 +0000460 def _find_aborting(self):
461 num_aborted = 0
462 # Find jobs that are aborting
463 for entry in queue_entries_to_abort():
464 agents_to_abort = self.get_agents(entry)
465 entry_host = entry.get_host()
466 reboot_task = RebootTask(entry_host)
467 verify_task = VerifyTask(host = entry_host)
468 tasks = [reboot_task, verify_task]
469 if agents_to_abort:
470 abort_task = AbortTask(entry, agents_to_abort)
471 tasks.insert(0, abort_task)
472 else:
473 entry.set_status('Aborted')
474 # just to make sure this host does not get
475 # taken away
476 entry_host.set_status('Rebooting')
477 self.add_agent(Agent(tasks=tasks,
478 queue_entry_ids = [entry.id]))
479 num_aborted += 1
480 if num_aborted >= 50:
481 break
482
483
mbligh36768f02008-02-22 18:28:33 +0000484 def _handle_agents(self):
485 still_running = []
486 for agent in self._agents:
487 agent.tick()
488 if not agent.is_done():
489 still_running.append(agent)
490 else:
491 print "agent finished"
492 self._agents = still_running
493
494
495class RunMonitor(object):
496 def __init__(self, cmd, nice_level = None, log_file = None):
497 self.nice_level = nice_level
498 self.log_file = log_file
499 self.proc = self.run(cmd)
500
501 def run(self, cmd):
502 if self.nice_level:
503 nice_cmd = ['nice','-n', str(self.nice_level)]
504 nice_cmd.extend(cmd)
505 cmd = nice_cmd
506
507 out_file = None
508 if self.log_file:
509 try:
mblighbb421852008-03-11 22:36:16 +0000510 os.makedirs(os.path.dirname(self.log_file))
mbligh36768f02008-02-22 18:28:33 +0000511 out_file = open(self.log_file, 'a')
512 out_file.write("\n%s\n" % ('*'*80))
513 out_file.write("%s> %s\n" % (time.strftime("%X %x"), cmd))
514 out_file.write("%s\n" % ('*'*80))
515 except:
516 pass
517
518 if not out_file:
519 out_file = open('/dev/null', 'w')
520
521 in_devnull = open('/dev/null', 'r')
522 print "cmd = %s" % cmd
523 print "path = %s" % os.getcwd()
524
525 proc = subprocess.Popen(cmd, stdout=out_file,
526 stderr=subprocess.STDOUT, stdin=in_devnull)
527 out_file.close()
528 in_devnull.close()
529 return proc
530
531
mblighbb421852008-03-11 22:36:16 +0000532 def get_pid(self):
533 return self.proc.pid
534
535
mbligh36768f02008-02-22 18:28:33 +0000536 def kill(self):
mblighbb421852008-03-11 22:36:16 +0000537 kill_autoserv(self.get_pid(), self.exit_code)
538
mbligh36768f02008-02-22 18:28:33 +0000539
540 def exit_code(self):
541 return self.proc.poll()
542
543
mblighbb421852008-03-11 22:36:16 +0000544class PidfileException(Exception):
545 """\
546 Raised when there's some unexpected behavior with the pid file.
547 """
548
549
550class PidfileRunMonitor(RunMonitor):
551 def __init__(self, results_dir, cmd=None, nice_level=None,
552 log_file=None):
553 self.results_dir = os.path.abspath(results_dir)
554 self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
555 self.lost_process = False
556 if cmd is None:
557 # we're reattaching to an existing pid, so don't call
558 # the superconstructor (we don't want to kick off a new
559 # process)
560 pass
561 else:
562 RunMonitor.__init__(self, cmd, nice_level, log_file)
563
564
565 def get_pid(self):
566 pid, exit_status = self.get_pidfile_info()
567 assert pid is not None
568 return pid
569
570
571 def check_proc_fs(self, pid):
572 cmdline_path = os.path.join('/proc', str(pid), 'cmdline')
573 try:
574 cmdline_file = open(cmdline_path, 'r')
575 cmdline = cmdline_file.read().strip()
576 cmdline_file.close()
577 except IOError:
578 return False
579 # /proc/.../cmdline has \x00 separating args
580 results_dir_arg = '\x00-r\x00%s\x00' % self.results_dir
581 if results_dir_arg not in cmdline:
582 print '%s not found in %s' % (repr(results_dir_arg),
583 repr(cmdline))
584 return False
585 return True
586
587
588 def read_pidfile(self):
589 if not os.path.exists(self.pid_file):
590 return None, None
591 file_obj = open(self.pid_file, 'r')
592 lines = file_obj.readlines()
593 file_obj.close()
594 assert 1 <= len(lines) <= 2
595 try:
596 pid = int(lines[0])
597 exit_status = None
598 if len(lines) == 2:
599 exit_status = int(lines[1])
600 except ValueError, exc:
601 raise Exception('Corrupt pid file: ' + str(exc.args))
602
603 return pid, exit_status
604
605
606 def get_pidfile_info(self):
607 """\
608 Returns:
609 None, None if autoserv has not yet run
610 pid, None if autoserv is running
611 pid, exit_status if autoserv has completed
612 """
613 if self.lost_process:
614 return self.pid, self.exit_status
615
616 pid, exit_status = self.read_pidfile()
617
618 # double check autoserv is really running if it says it is
619 if (pid is not None and exit_status is None
620 and not self.check_proc_fs(pid)):
621 # maybe process *just* exited
622 pid, exit_status = self.read_pidfile()
623 if not exit_status:
624 # autoserv exited without writing an exit code
625 # to the pidfile
626 error = ('autoserv died without writing exit '
627 'code')
628 message = error + '\nPid: %s\nPidfile: %s' % (
629 pid, self.pid_file)
630 print message
631 send_notify_email(error, message)
632 self.lost_process = True
633 self.pid = pid
634 self.exit_status = 1
635 return self.pid, self.exit_status
636
637 return pid, exit_status
638
639
640 def exit_code(self):
641 pid, exit_code = self.get_pidfile_info()
642 assert pid is not None
643 return exit_code
644
645
mbligh36768f02008-02-22 18:28:33 +0000646class Agent(object):
mblighd5c95802008-03-05 00:33:46 +0000647 def __init__(self, tasks, queue_entry_ids=[]):
mbligh36768f02008-02-22 18:28:33 +0000648 self.active_task = None
649 self.queue = Queue.Queue(0)
650 self.dispatcher = None
mblighd5c95802008-03-05 00:33:46 +0000651 self.queue_entry_ids = queue_entry_ids
mbligh36768f02008-02-22 18:28:33 +0000652
653 for task in tasks:
654 self.add_task(task)
655
656
657 def add_task(self, task):
658 self.queue.put_nowait(task)
659 task.agent = self
660
661
662 def tick(self):
663 print "agent tick"
664 if self.active_task and not self.active_task.is_done():
665 self.active_task.poll()
666 else:
667 self._next_task();
668
669
670 def _next_task(self):
671 print "agent picking task"
672 if self.active_task:
673 assert self.active_task.is_done()
674
mblighe2586682008-02-29 22:45:46 +0000675 if not self.active_task.success:
676 self.on_task_failure()
677
mbligh36768f02008-02-22 18:28:33 +0000678 self.active_task = None
679 if not self.is_done():
680 self.active_task = self.queue.get_nowait()
681 if self.active_task:
682 self.active_task.start()
683
684
mblighe2586682008-02-29 22:45:46 +0000685 def on_task_failure(self):
mblighe2586682008-02-29 22:45:46 +0000686 self.queue = Queue.Queue(0)
687 for task in self.active_task.failure_tasks:
688 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +0000689
mblighe2586682008-02-29 22:45:46 +0000690
mbligh36768f02008-02-22 18:28:33 +0000691 def is_done(self):
692 return self.active_task == None and self.queue.empty()
693
694
695 def start(self):
696 assert self.dispatcher
697
698 self._next_task()
699
mblighd5c95802008-03-05 00:33:46 +0000700
mbligh36768f02008-02-22 18:28:33 +0000701class AgentTask(object):
mbligh16c722d2008-03-05 00:58:44 +0000702 def __init__(self, cmd, failure_tasks = []):
mbligh36768f02008-02-22 18:28:33 +0000703 self.done = False
704 self.failure_tasks = failure_tasks
705 self.started = False
706 self.cmd = cmd
mblighd5c95802008-03-05 00:33:46 +0000707 self.task = None
mbligh36768f02008-02-22 18:28:33 +0000708 self.agent = None
mblighd5c95802008-03-05 00:33:46 +0000709 self.monitor = None
mbligh36768f02008-02-22 18:28:33 +0000710
711
712 def poll(self):
713 print "poll"
mblighd5c95802008-03-05 00:33:46 +0000714 if self.monitor:
mbligh36768f02008-02-22 18:28:33 +0000715 self.tick(self.monitor.exit_code())
716 else:
717 self.finished(False)
718
719
720 def tick(self, exit_code):
721 if exit_code==None:
722 return
723# print "exit_code was %d" % exit_code
724 if exit_code == 0:
725 success = True
726 else:
727 success = False
728
729 self.finished(success)
730
731
732 def is_done(self):
733 return self.done
734
735
736 def finished(self, success):
737 self.done = True
738 self.success = success
739 self.epilog()
740
741
742 def prolog(self):
743 pass
744
745
746 def epilog(self):
747 pass
748
749
750 def start(self):
751 assert self.agent
752
753 if not self.started:
754 self.prolog()
755 self.run()
756
757 self.started = True
758
759
760 def abort(self):
mblighd5c95802008-03-05 00:33:46 +0000761 if self.monitor:
762 self.monitor.kill()
763 self.done = True
mbligh36768f02008-02-22 18:28:33 +0000764
765
766 def run(self):
767 if self.cmd:
768 print "agent starting monitor"
mbligh36768f02008-02-22 18:28:33 +0000769 log_file = None
770 if hasattr(self, 'host'):
mblighbb421852008-03-11 22:36:16 +0000771 log_file = os.path.join(RESULTS_DIR, 'hosts',
772 self.host.hostname)
773 self.monitor = RunMonitor(
774 self.cmd, nice_level = AUTOSERV_NICE_LEVEL,
775 log_file = log_file)
mbligh36768f02008-02-22 18:28:33 +0000776
777
778class RepairTask(AgentTask):
mbligh16c722d2008-03-05 00:58:44 +0000779 def __init__(self, host, fail_queue_entry=None):
780 """\
781 fail_queue_entry: queue entry to mark failed if this repair
782 fails.
783 """
mbligh48c10a52008-02-29 22:46:38 +0000784 cmd = [_autoserv_path , '-R', '-m', host.hostname]
mbligh36768f02008-02-22 18:28:33 +0000785 self.host = host
mbligh16c722d2008-03-05 00:58:44 +0000786 self.fail_queue_entry = fail_queue_entry
787 AgentTask.__init__(self, cmd)
mblighe2586682008-02-29 22:45:46 +0000788
mbligh36768f02008-02-22 18:28:33 +0000789
790 def prolog(self):
791 print "repair_task starting"
792 self.host.set_status('Repairing')
793
794
795 def epilog(self):
796 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000797 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +0000798 else:
mbligh16c722d2008-03-05 00:58:44 +0000799 self.host.set_status('Repair Failed')
800 if self.fail_queue_entry:
801 self.fail_queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +0000802
803
804class VerifyTask(AgentTask):
805 def __init__(self, queue_entry=None, host=None):
806 assert bool(queue_entry) != bool(host)
807
808 self.host = host or queue_entry.host
809 self.queue_entry = queue_entry
810
811 self.temp_results_dir = tempfile.mkdtemp(suffix='.verify')
mbligh48c10a52008-02-29 22:46:38 +0000812 cmd = [_autoserv_path,'-v','-m',self.host.hostname,
mbligh36768f02008-02-22 18:28:33 +0000813 '-r', self.temp_results_dir]
814
mbligh16c722d2008-03-05 00:58:44 +0000815 fail_queue_entry = None
816 if queue_entry and not queue_entry.meta_host:
817 fail_queue_entry = queue_entry
818 failure_tasks = [RepairTask(self.host, fail_queue_entry)]
mblighe2586682008-02-29 22:45:46 +0000819
mblighdffd6372008-02-29 22:47:33 +0000820 AgentTask.__init__(self, cmd, failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +0000821
822
mbligh36768f02008-02-22 18:28:33 +0000823 def prolog(self):
824 print "starting verify on %s" % (self.host.hostname)
825 if self.queue_entry:
826 self.queue_entry.set_status('Verifying')
mblighdffd6372008-02-29 22:47:33 +0000827 self.queue_entry.clear_results_dir(
828 self.queue_entry.verify_results_dir())
mbligh36768f02008-02-22 18:28:33 +0000829 self.host.set_status('Verifying')
830
831
832 def epilog(self):
833 if self.queue_entry and (self.success or
834 not self.queue_entry.meta_host):
835 self.move_results()
mblighe2586682008-02-29 22:45:46 +0000836 shutil.rmtree(self.temp_results_dir)
mbligh36768f02008-02-22 18:28:33 +0000837
838 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000839 self.host.set_status('Ready')
840 elif self.queue_entry:
mblighdffd6372008-02-29 22:47:33 +0000841 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +0000842
843
844 def move_results(self):
845 assert self.queue_entry is not None
mblighe2586682008-02-29 22:45:46 +0000846 target_dir = self.queue_entry.verify_results_dir()
mbligh36768f02008-02-22 18:28:33 +0000847 if not os.path.exists(target_dir):
848 os.makedirs(target_dir)
849 files = os.listdir(self.temp_results_dir)
850 for filename in files:
mblighbb421852008-03-11 22:36:16 +0000851 if filename == AUTOSERV_PID_FILE:
852 continue
mblighe2586682008-02-29 22:45:46 +0000853 self.force_move(os.path.join(self.temp_results_dir,
854 filename),
855 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +0000856
857
mblighe2586682008-02-29 22:45:46 +0000858 @staticmethod
859 def force_move(source, dest):
860 """\
861 Replacement for shutil.move() that will delete the destination
862 if it exists, even if it's a directory.
863 """
864 if os.path.exists(dest):
865 print ('Warning: removing existing destination file ' +
866 dest)
867 remove_file_or_dir(dest)
868 shutil.move(source, dest)
869
870
mblighdffd6372008-02-29 22:47:33 +0000871class VerifySynchronousTask(VerifyTask):
872 def __init__(self, queue_entry):
873 VerifyTask.__init__(self, queue_entry = queue_entry)
874
875
mbligh16c722d2008-03-05 00:58:44 +0000876 def epilog(self):
877 VerifyTask.epilog(self)
878 if self.success:
879 if self.queue_entry.job.num_complete() > 0:
880 # some other entry failed verify, and we've
881 # already been marked as stopped
882 return
mblighdffd6372008-02-29 22:47:33 +0000883
mbligh16c722d2008-03-05 00:58:44 +0000884 self.queue_entry.set_status('Pending')
885 job = self.queue_entry.job
886 if job.is_ready():
887 agent = job.run(self.queue_entry)
888 self.agent.dispatcher.add_agent(agent)
mblighe2586682008-02-29 22:45:46 +0000889
mbligh36768f02008-02-22 18:28:33 +0000890class QueueTask(AgentTask):
891 def __init__(self, job, queue_entries, cmd):
892 AgentTask.__init__(self, cmd)
893 self.job = job
894 self.queue_entries = queue_entries
895
896
mblighbb421852008-03-11 22:36:16 +0000897 @staticmethod
898 def _write_keyval(results_dir, field, value):
899 key_path = os.path.join(results_dir, 'keyval')
mbligh36768f02008-02-22 18:28:33 +0000900 keyval_file = open(key_path, 'a')
901 print >> keyval_file, '%s=%d' % (field, value)
902 keyval_file.close()
903
904
mblighbb421852008-03-11 22:36:16 +0000905 def results_dir(self):
906 return self.queue_entries[0].results_dir()
907
908
909 def run(self):
910 """\
911 Override AgentTask.run() so we can use a PidfileRunMonitor.
912 """
913 self.monitor = PidfileRunMonitor(self.results_dir(),
914 cmd=self.cmd,
915 nice_level=AUTOSERV_NICE_LEVEL)
916
917
mbligh36768f02008-02-22 18:28:33 +0000918 def prolog(self):
mblighdbdac6c2008-03-05 15:49:58 +0000919 # write the parser commands into the results directories
920 if self.job.is_synchronous() or self.job.num_machines()==1:
921 results_dir = self.job.results_dir()
922 cmdfile = os.path.join(results_dir, '.parse.cmd')
923 cmd = generate_parse_command(results_dir)
924 print >> open(cmdfile, 'w'), cmd
925 else:
926 for queue_entry in self.queue_entries:
927 results_dir = queue_entry.results_dir()
928 cmdfile = os.path.join(results_dir,
929 '.parse.cmd')
930 cmd = generate_parse_command(results_dir,
931 '-l 2')
932 print >> open(cmdfile, 'w'), cmd
933
mblighe2586682008-02-29 22:45:46 +0000934 # write some job timestamps into the job keyval file
935 queued = time.mktime(self.job.created_on.timetuple())
936 started = time.time()
mblighbb421852008-03-11 22:36:16 +0000937 self._write_keyval(self.results_dir(), "job_queued", queued)
938 self._write_keyval(self.results_dir(), "job_started", started)
mbligh36768f02008-02-22 18:28:33 +0000939 for queue_entry in self.queue_entries:
940 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
941 queue_entry.set_status('Running')
942 queue_entry.host.set_status('Running')
mblighe2586682008-02-29 22:45:46 +0000943 if (not self.job.is_synchronous() and
944 self.job.num_machines() > 1):
945 assert len(self.queue_entries) == 1
946 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +0000947
948
949 def epilog(self):
950 if self.success:
951 status = 'Completed'
952 else:
953 status = 'Failed'
954
mblighe2586682008-02-29 22:45:46 +0000955 # write another timestamp into the job keyval file
956 finished = time.time()
mblighbb421852008-03-11 22:36:16 +0000957 self._write_keyval(self.results_dir(), "job_finished", finished)
mbligh36768f02008-02-22 18:28:33 +0000958 for queue_entry in self.queue_entries:
959 queue_entry.set_status(status)
960 queue_entry.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +0000961
962 if self.job.is_synchronous() or self.job.num_machines()==1:
963 if self.job.is_finished():
964 parse_results(self.job.results_dir())
965 else:
966 for queue_entry in self.queue_entries:
mblighbb421852008-03-11 22:36:16 +0000967 parse_results(queue_entry.results_dir(),
968 flags='-l 2')
969
mbligh36768f02008-02-22 18:28:33 +0000970 print "queue_task finished with %s/%s" % (status, self.success)
971
972
mblighbb421852008-03-11 22:36:16 +0000973class RecoveryQueueTask(QueueTask):
974 def __init__(self, job, queue_entries, run_monitor):
975 QueueTask.__init__(self, job, queue_entries, cmd=None)
976 self.run_monitor = run_monitor
977
978
979 def run(self):
980 self.monitor = self.run_monitor
981
982
983 def prolog(self):
984 # recovering an existing process - don't do prolog
985 pass
986
987
mbligh36768f02008-02-22 18:28:33 +0000988class RebootTask(AgentTask):
mblighd5c95802008-03-05 00:33:46 +0000989 def __init__(self, host):
990 global _autoserv_path
991
992 # Current implementation of autoserv requires control file
993 # to be passed on reboot action request. TODO: remove when no
994 # longer appropriate.
995 self.cmd = [_autoserv_path, '-b', '-m', host.hostname,
996 '/dev/null']
mbligh36768f02008-02-22 18:28:33 +0000997 self.host = host
mblighd5c95802008-03-05 00:33:46 +0000998 AgentTask.__init__(self, self.cmd,
mbligh16c722d2008-03-05 00:58:44 +0000999 failure_tasks=[RepairTask(host)])
1000
mblighd5c95802008-03-05 00:33:46 +00001001
1002 def prolog(self):
1003 print "starting reboot task for host: %s" % self.host.hostname
1004 self.host.set_status("Rebooting")
1005
mblighd5c95802008-03-05 00:33:46 +00001006
1007class AbortTask(AgentTask):
1008 def __init__(self, queue_entry, agents_to_abort):
1009 self.queue_entry = queue_entry
1010 self.agents_to_abort = agents_to_abort
1011 for agent in agents_to_abort:
1012 agent.dispatcher.remove_agent(agent)
1013 AgentTask.__init__(self, '')
mbligh36768f02008-02-22 18:28:33 +00001014
1015
mblighd5c95802008-03-05 00:33:46 +00001016 def prolog(self):
1017 print "starting abort on host %s, job %s" % (
1018 self.queue_entry.host_id, self.queue_entry.job_id)
1019 self.queue_entry.set_status('Aborting')
1020
mbligh36768f02008-02-22 18:28:33 +00001021
mblighd5c95802008-03-05 00:33:46 +00001022 def epilog(self):
1023 self.queue_entry.set_status('Aborted')
1024 self.success = True
mbligh36768f02008-02-22 18:28:33 +00001025
1026 def run(self):
mblighd5c95802008-03-05 00:33:46 +00001027 for agent in self.agents_to_abort:
1028 if (agent.active_task):
1029 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001030
1031
1032class DBObject(object):
mblighe2586682008-02-29 22:45:46 +00001033 def __init__(self, fields, id=None, row=None, new_record=False):
1034 assert (bool(id) != bool(row)) and fields
mbligh36768f02008-02-22 18:28:33 +00001035
mblighe2586682008-02-29 22:45:46 +00001036 self.__table = self._get_table()
mbligh36768f02008-02-22 18:28:33 +00001037 self.__fields = fields
1038
1039 self.__new_record = new_record
1040
1041 if row is None:
1042 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
mbligh6f8bab42008-02-29 22:45:14 +00001043 rows = _db.execute(sql, (id,))
1044 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001045 raise "row not found (table=%s, id=%s)" % \
1046 (self.__table, id)
mbligh6f8bab42008-02-29 22:45:14 +00001047 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001048
mblighe2586682008-02-29 22:45:46 +00001049 assert len(row)==len(fields), (
1050 "table = %s, row = %s/%d, fields = %s/%d" % (
1051 self.__table, row, len(row), fields, len(fields)))
mbligh36768f02008-02-22 18:28:33 +00001052
1053 self.__valid_fields = {}
1054 for i,value in enumerate(row):
1055 self.__dict__[fields[i]] = value
1056 self.__valid_fields[fields[i]] = True
1057
1058 del self.__valid_fields['id']
1059
mblighe2586682008-02-29 22:45:46 +00001060
1061 @classmethod
1062 def _get_table(cls):
1063 raise NotImplementedError('Subclasses must override this')
1064
1065
mbligh36768f02008-02-22 18:28:33 +00001066 def count(self, where, table = None):
1067 if not table:
1068 table = self.__table
mbligh4314a712008-02-29 22:44:30 +00001069
mbligh6f8bab42008-02-29 22:45:14 +00001070 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001071 SELECT count(*) FROM %s
1072 WHERE %s
1073 """ % (table, where))
mbligh36768f02008-02-22 18:28:33 +00001074
mbligh6f8bab42008-02-29 22:45:14 +00001075 assert len(rows) == 1
1076
1077 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001078
1079
1080 def num_cols(self):
1081 return len(self.__fields)
1082
1083
1084 def update_field(self, field, value):
1085 assert self.__valid_fields[field]
1086
1087 if self.__dict__[field] == value:
1088 return
1089
1090 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
1091 (self.__table, field)
mbligh6f8bab42008-02-29 22:45:14 +00001092 _db.execute(query, (value, self.id))
mbligh36768f02008-02-22 18:28:33 +00001093
1094 self.__dict__[field] = value
1095
1096
1097 def save(self):
1098 if self.__new_record:
1099 keys = self.__fields[1:] # avoid id
1100 columns = ','.join([str(key) for key in keys])
1101 values = ['"%s"' % self.__dict__[key] for key in keys]
1102 values = ','.join(values)
1103 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1104 (self.__table, columns, values)
mbligh6f8bab42008-02-29 22:45:14 +00001105 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001106
1107
mblighe2586682008-02-29 22:45:46 +00001108 def delete(self):
1109 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1110 _db.execute(query, (self.id,))
1111
1112
1113 @classmethod
1114 def fetch(cls, where):
1115 rows = _db.execute(
1116 'SELECT * FROM %s WHERE %s' % (cls._get_table(), where))
1117 for row in rows:
1118 yield cls(row=row)
1119
mbligh36768f02008-02-22 18:28:33 +00001120
1121class IneligibleHostQueue(DBObject):
1122 def __init__(self, id=None, row=None, new_record=None):
1123 fields = ['id', 'job_id', 'host_id']
mblighe2586682008-02-29 22:45:46 +00001124 DBObject.__init__(self, fields, id=id, row=row,
1125 new_record=new_record)
1126
1127
1128 @classmethod
1129 def _get_table(cls):
1130 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001131
1132
1133class Host(DBObject):
1134 def __init__(self, id=None, row=None):
1135 fields = ['id', 'hostname', 'locked', 'synch_id','status']
mblighe2586682008-02-29 22:45:46 +00001136 DBObject.__init__(self, fields, id=id, row=row)
1137
1138
1139 @classmethod
1140 def _get_table(cls):
1141 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001142
1143
1144 def current_task(self):
mbligh6f8bab42008-02-29 22:45:14 +00001145 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001146 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1147 """, (self.id,))
1148
mbligh6f8bab42008-02-29 22:45:14 +00001149 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001150 return None
1151 else:
mbligh6f8bab42008-02-29 22:45:14 +00001152 assert len(rows) == 1
1153 results = rows[0];
mbligh36768f02008-02-22 18:28:33 +00001154# print "current = %s" % results
1155 return HostQueueEntry(row=results)
1156
1157
1158 def next_queue_entries(self):
1159 if self.locked:
1160 print "%s locked, not queuing" % self.hostname
1161 return None
1162# print "%s/%s looking for work" % (self.hostname, self.platform_id)
mbligh6f8bab42008-02-29 22:45:14 +00001163 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001164 SELECT * FROM host_queue_entries
1165 WHERE ((host_id=%s) OR (meta_host IS NOT null AND
1166 (meta_host IN (
1167 SELECT label_id FROM hosts_labels WHERE host_id=%s
1168 )
1169 )
1170 AND job_id NOT IN (
1171 SELECT job_id FROM ineligible_host_queues
1172 WHERE host_id=%s
1173 )))
1174 AND NOT complete AND NOT active
1175 ORDER BY priority DESC, meta_host, id
1176 LIMIT 1
1177 """, (self.id,self.id, self.id))
1178
mbligh6f8bab42008-02-29 22:45:14 +00001179 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001180 return None
1181 else:
mbligh6f8bab42008-02-29 22:45:14 +00001182 return [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001183
1184 def yield_work(self):
1185 print "%s yielding work" % self.hostname
1186 if self.current_task():
1187 self.current_task().requeue()
1188
1189 def set_status(self,status):
mblighbb421852008-03-11 22:36:16 +00001190 print '%s -> %s' % (self.hostname, status)
mbligh36768f02008-02-22 18:28:33 +00001191 self.update_field('status',status)
1192
1193
1194class HostQueueEntry(DBObject):
1195 def __init__(self, id=None, row=None):
1196 assert id or row
1197 fields = ['id', 'job_id', 'host_id', 'priority', 'status',
1198 'meta_host', 'active', 'complete']
mblighe2586682008-02-29 22:45:46 +00001199 DBObject.__init__(self, fields, id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001200
1201 self.job = Job(self.job_id)
1202
1203 if self.host_id:
1204 self.host = Host(self.host_id)
1205 else:
1206 self.host = None
1207
1208 self.queue_log_path = os.path.join(self.job.results_dir(),
1209 'queue.log.' + str(self.id))
1210
1211
mblighe2586682008-02-29 22:45:46 +00001212 @classmethod
1213 def _get_table(cls):
1214 return 'host_queue_entries'
1215
1216
mbligh36768f02008-02-22 18:28:33 +00001217 def set_host(self, host):
1218 if host:
1219 self.queue_log_record('Assigning host ' + host.hostname)
1220 self.update_field('host_id', host.id)
1221 self.update_field('active', True)
mblighe2586682008-02-29 22:45:46 +00001222 self.block_host(host.id)
mbligh36768f02008-02-22 18:28:33 +00001223 else:
1224 self.queue_log_record('Releasing host')
mblighe2586682008-02-29 22:45:46 +00001225 self.unblock_host(self.host.id)
mbligh36768f02008-02-22 18:28:33 +00001226 self.update_field('host_id', None)
1227
1228 self.host = host
1229
1230
1231 def get_host(self):
mblighe2586682008-02-29 22:45:46 +00001232 return self.host
mbligh36768f02008-02-22 18:28:33 +00001233
1234
1235 def queue_log_record(self, log_line):
mblighe2586682008-02-29 22:45:46 +00001236 now = str(datetime.datetime.now())
mbligh36768f02008-02-22 18:28:33 +00001237 queue_log = open(self.queue_log_path, 'a', 0)
mblighe2586682008-02-29 22:45:46 +00001238 queue_log.write(now + ' ' + log_line + '\n')
mbligh36768f02008-02-22 18:28:33 +00001239 queue_log.close()
1240
1241
mblighe2586682008-02-29 22:45:46 +00001242 def block_host(self, host_id):
1243 print "creating block %s/%s" % (self.job.id, host_id)
1244 row = [0, self.job.id, host_id]
1245 block = IneligibleHostQueue(row=row, new_record=True)
1246 block.save()
1247
1248
1249 def unblock_host(self, host_id):
1250 print "removing block %s/%s" % (self.job.id, host_id)
1251 blocks = list(IneligibleHostQueue.fetch(
1252 'job_id=%d and host_id=%d' % (self.job.id, host_id)))
1253 assert len(blocks) == 1
1254 blocks[0].delete()
1255
1256
mbligh36768f02008-02-22 18:28:33 +00001257 def results_dir(self):
mblighe2586682008-02-29 22:45:46 +00001258 if self.job.is_synchronous() or self.job.num_machines() == 1:
1259 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001260 else:
1261 assert self.host
mblighe2586682008-02-29 22:45:46 +00001262 return os.path.join(self.job.job_dir,
1263 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +00001264
mblighe2586682008-02-29 22:45:46 +00001265
1266 def verify_results_dir(self):
1267 if self.job.is_synchronous() or self.job.num_machines() > 1:
1268 assert self.host
1269 return os.path.join(self.job.job_dir,
1270 self.host.hostname)
1271 else:
1272 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001273
1274
1275 def set_status(self, status):
1276 self.update_field('status', status)
1277 if self.host:
1278 hostname = self.host.hostname
1279 else:
1280 hostname = 'no host'
1281 print "%s/%d status -> %s" % (hostname, self.id, self.status)
1282 if status in ['Queued']:
1283 self.update_field('complete', False)
1284 self.update_field('active', False)
1285
mblighd5c95802008-03-05 00:33:46 +00001286 if status in ['Pending', 'Running', 'Verifying', 'Starting',
1287 'Abort', 'Aborting']:
mbligh36768f02008-02-22 18:28:33 +00001288 self.update_field('complete', False)
1289 self.update_field('active', True)
1290
mblighd5c95802008-03-05 00:33:46 +00001291 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
mbligh36768f02008-02-22 18:28:33 +00001292 self.update_field('complete', True)
1293 self.update_field('active', False)
1294
1295
1296 def run(self,assigned_host=None):
1297 if self.meta_host:
1298 assert assigned_host
mblighe2586682008-02-29 22:45:46 +00001299 # ensure results dir exists for the queue log
mbligh36768f02008-02-22 18:28:33 +00001300 self.job.create_results_dir()
1301 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001302
mbligh36768f02008-02-22 18:28:33 +00001303 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1304 self.meta_host, self.host.hostname, self.status)
1305
1306 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001307
mbligh36768f02008-02-22 18:28:33 +00001308 def requeue(self):
1309 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001310
mbligh36768f02008-02-22 18:28:33 +00001311 if self.meta_host:
1312 self.set_host(None)
1313
1314
mblighe2586682008-02-29 22:45:46 +00001315 def handle_host_failure(self):
1316 """\
1317 Called when this queue entry's host has failed verification and
1318 repair.
1319 """
mblighdffd6372008-02-29 22:47:33 +00001320 assert not self.meta_host
1321 self.set_status('Failed')
1322 if self.job.is_synchronous():
1323 self.job.stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00001324
1325
1326 def clear_results_dir(self, results_dir=None):
1327 results_dir = results_dir or self.results_dir()
1328 if not os.path.exists(results_dir):
1329 return
1330 for filename in os.listdir(results_dir):
1331 if 'queue.log' in filename:
1332 continue
1333 path = os.path.join(results_dir, filename)
1334 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00001335
1336
1337class Job(DBObject):
1338 def __init__(self, id=None, row=None):
1339 assert id or row
mblighe2586682008-02-29 22:45:46 +00001340 DBObject.__init__(self,
1341 ['id','owner','name','priority',
1342 'control_file','control_type','created_on',
1343 'synch_type', 'synch_count','synchronizing'],
1344 id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001345
mblighe2586682008-02-29 22:45:46 +00001346 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
1347 self.owner))
1348
1349
1350 @classmethod
1351 def _get_table(cls):
1352 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001353
1354
1355 def is_server_job(self):
1356 return self.control_type != 2
1357
1358
1359 def get_host_queue_entries(self):
mbligh6f8bab42008-02-29 22:45:14 +00001360 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001361 SELECT * FROM host_queue_entries
1362 WHERE job_id= %s
1363 """, (self.id,))
mbligh6f8bab42008-02-29 22:45:14 +00001364 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001365
1366 assert len(entries)>0
1367
1368 return entries
1369
1370
1371 def set_status(self, status, update_queues=False):
1372 self.update_field('status',status)
1373
1374 if update_queues:
1375 for queue_entry in self.get_host_queue_entries():
1376 queue_entry.set_status(status)
1377
1378
1379 def is_synchronous(self):
1380 return self.synch_type == 2
1381
1382
1383 def is_ready(self):
1384 if not self.is_synchronous():
1385 return True
1386 sql = "job_id=%s AND status='Pending'" % self.id
1387 count = self.count(sql, table='host_queue_entries')
1388 return (count == self.synch_count)
1389
1390
1391 def ready_to_synchronize(self):
1392 # heuristic
1393 queue_entries = self.get_host_queue_entries()
1394 count = 0
1395 for queue_entry in queue_entries:
1396 if queue_entry.status == 'Pending':
1397 count += 1
1398
1399 return (count/self.synch_count >= 0.5)
1400
1401
1402 def start_synchronizing(self):
1403 self.update_field('synchronizing', True)
1404
1405
1406 def results_dir(self):
1407 return self.job_dir
1408
1409 def num_machines(self, clause = None):
1410 sql = "job_id=%s" % self.id
1411 if clause:
1412 sql += " AND (%s)" % clause
1413 return self.count(sql, table='host_queue_entries')
1414
1415
1416 def num_queued(self):
1417 return self.num_machines('not complete')
1418
1419
1420 def num_active(self):
1421 return self.num_machines('active')
1422
1423
1424 def num_complete(self):
1425 return self.num_machines('complete')
1426
1427
1428 def is_finished(self):
1429 left = self.num_queued()
1430 print "%s: %s machines left" % (self.name, left)
1431 return left==0
1432
1433 def stop_synchronizing(self):
1434 self.update_field('synchronizing', False)
1435 self.set_status('Queued', update_queues = False)
1436
1437
mblighe2586682008-02-29 22:45:46 +00001438 def stop_all_entries(self):
1439 for child_entry in self.get_host_queue_entries():
1440 if not child_entry.complete:
1441 child_entry.set_status('Stopped')
1442
1443
1444 def write_to_machines_file(self, queue_entry):
1445 hostname = queue_entry.get_host().hostname
1446 print "writing %s to job %s machines file" % (hostname, self.id)
1447 file_path = os.path.join(self.job_dir, '.machines')
1448 mf = open(file_path, 'a')
1449 mf.write("%s\n" % queue_entry.get_host().hostname)
1450 mf.close()
mbligh36768f02008-02-22 18:28:33 +00001451
1452
1453 def create_results_dir(self, queue_entry=None):
1454 print "create: active: %s complete %s" % (self.num_active(),
1455 self.num_complete())
1456
1457 if not os.path.exists(self.job_dir):
1458 os.makedirs(self.job_dir)
1459
1460 if queue_entry:
1461 return queue_entry.results_dir()
1462 return self.job_dir
1463
1464
1465 def run(self, queue_entry):
1466 results_dir = self.create_results_dir(queue_entry)
1467
1468 if self.is_synchronous():
1469 if not self.is_ready():
mblighd5c95802008-03-05 00:33:46 +00001470 return Agent([VerifySynchronousTask(
1471 queue_entry = queue_entry)],
1472 [queue_entry.id])
mbligh36768f02008-02-22 18:28:33 +00001473
1474 queue_entry.set_status('Starting')
1475
1476 ctrl = open(os.tmpnam(), 'w')
1477 if self.control_file:
1478 ctrl.write(self.control_file)
1479 else:
1480 ctrl.write("")
1481 ctrl.flush()
1482
1483 if self.is_synchronous():
mbligh36768f02008-02-22 18:28:33 +00001484 queue_entries = self.get_host_queue_entries()
1485 else:
1486 assert queue_entry
mbligh36768f02008-02-22 18:28:33 +00001487 queue_entries = [queue_entry]
mblighe2586682008-02-29 22:45:46 +00001488 hostnames = ','.join([entry.get_host().hostname
1489 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00001490
mblighbb421852008-03-11 22:36:16 +00001491 params = [_autoserv_path, '-p', '-n',
1492 '-r', os.path.abspath(results_dir),
1493 '-b', '-u', self.owner, '-l', self.name,
1494 '-m', hostnames, ctrl.name]
mbligh36768f02008-02-22 18:28:33 +00001495
1496 if not self.is_server_job():
1497 params.append('-c')
1498
1499 tasks = []
1500 if not self.is_synchronous():
1501 tasks.append(VerifyTask(queue_entry))
mblighe2586682008-02-29 22:45:46 +00001502
1503 tasks.append(QueueTask(job = self,
1504 queue_entries = queue_entries,
1505 cmd = params))
mbligh36768f02008-02-22 18:28:33 +00001506
mblighd5c95802008-03-05 00:33:46 +00001507 ids = []
1508 for entry in queue_entries:
1509 ids.append(entry.id)
1510
1511 agent = Agent(tasks, ids)
mbligh36768f02008-02-22 18:28:33 +00001512
1513 return agent
1514
1515
1516if __name__ == '__main__':
1517 main()