blob: ac1f7f2aab62da0422246d09fe2af3d594047e26 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
mblighcadb3532008-04-15 17:46:26 +00009import optparse, signal, smtplib, socket, datetime, stat, pwd, errno
mblighb090f142008-02-27 21:33:46 +000010from common import global_config
11
mbligh36768f02008-02-22 18:28:33 +000012RESULTS_DIR = '.'
13AUTOSERV_NICE_LEVEL = 10
14
15AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
16
17if os.environ.has_key('AUTOTEST_DIR'):
18 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
19AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
20AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
21
22if AUTOTEST_SERVER_DIR not in sys.path:
23 sys.path.insert(0, AUTOTEST_SERVER_DIR)
24
mblighbb421852008-03-11 22:36:16 +000025AUTOSERV_PID_FILE = '.autoserv_execute'
mbligh90a549d2008-03-25 23:52:34 +000026# how long to wait for autoserv to write a pidfile
27PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000028
mbligh6f8bab42008-02-29 22:45:14 +000029_db = None
mbligh36768f02008-02-22 18:28:33 +000030_shutdown = False
31_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000032_autoserv_path = 'autoserv'
33_testing_mode = False
mbligh36768f02008-02-22 18:28:33 +000034
35
36def main():
37 usage = 'usage: %prog [options] results_dir'
38
39 parser = optparse.OptionParser(usage)
mblighbb421852008-03-11 22:36:16 +000040 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
mbligh36768f02008-02-22 18:28:33 +000041 action='store_true')
42 parser.add_option('--logfile', help='Set a log file that all stdout ' +
43 'should be redirected to. Stderr will go to this ' +
44 'file + ".err"')
45 parser.add_option('--notify', help='Set an email address to be ' +
46 'notified of exceptions')
mbligh4314a712008-02-29 22:44:30 +000047 parser.add_option('--test', help='Indicate that scheduler is under ' +
48 'test and should use dummy autoserv and no parsing',
49 action='store_true')
mbligh36768f02008-02-22 18:28:33 +000050 (options, args) = parser.parse_args()
51 if len(args) != 1:
52 parser.print_usage()
53 return
54
55 global RESULTS_DIR
56 RESULTS_DIR = args[0]
57
58 global _notify_email
59 _notify_email = options.notify
mbligh4314a712008-02-29 22:44:30 +000060
61 if options.test:
62 global _autoserv_path
63 _autoserv_path = 'autoserv_dummy'
64 global _testing_mode
65 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000066
67 init(options.logfile)
mblighbb421852008-03-11 22:36:16 +000068 dispatcher = Dispatcher()
69 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
mbligh36768f02008-02-22 18:28:33 +000070
71 try:
72 while not _shutdown:
73 dispatcher.tick()
74 time.sleep(20)
mbligh36768f02008-02-22 18:28:33 +000075 except:
76 log_stacktrace("Uncaught exception; terminating monitor_db")
77
mbligh6f8bab42008-02-29 22:45:14 +000078 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +000079
80
81def handle_sigint(signum, frame):
82 global _shutdown
83 _shutdown = True
84 print "Shutdown request received."
85
86
87def init(logfile):
88 if logfile:
89 enable_logging(logfile)
90 print "%s> dispatcher starting" % time.strftime("%X %x")
91 print "My PID is %d" % os.getpid()
92
93 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
mbligh6f8bab42008-02-29 22:45:14 +000094 global _db
95 _db = DatabaseConn()
mbligh36768f02008-02-22 18:28:33 +000096
97 print "Setting signal handler"
98 signal.signal(signal.SIGINT, handle_sigint)
99
100 print "Connected! Running..."
101
102
103def enable_logging(logfile):
104 out_file = logfile
105 err_file = "%s.err" % logfile
106 print "Enabling logging to %s (%s)" % (out_file, err_file)
107 out_fd = open(out_file, "a", buffering=0)
108 err_fd = open(err_file, "a", buffering=0)
109
110 os.dup2(out_fd.fileno(), sys.stdout.fileno())
111 os.dup2(err_fd.fileno(), sys.stderr.fileno())
112
113 sys.stdout = out_fd
114 sys.stderr = err_fd
115
116
117def idle_hosts():
mbligh6f8bab42008-02-29 22:45:14 +0000118 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000119 SELECT * FROM hosts h WHERE
120 id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
121 (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
122 OR
123 (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
124 INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
125 )
mbligh5244cbb2008-04-24 20:39:52 +0000126 AND locked=false AND invalid=false
127 AND (h.status IS null OR h.status='Ready') """)
mbligh6f8bab42008-02-29 22:45:14 +0000128 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000129 return hosts
130
mblighd5c95802008-03-05 00:33:46 +0000131def queue_entries_to_abort():
132 rows = _db.execute("""
133 SELECT * FROM host_queue_entries WHERE status='Abort';
134 """)
135 qe = [HostQueueEntry(row=i) for i in rows]
136 return qe
mbligh36768f02008-02-22 18:28:33 +0000137
mblighe2586682008-02-29 22:45:46 +0000138def remove_file_or_dir(path):
139 if stat.S_ISDIR(os.stat(path).st_mode):
140 # directory
141 shutil.rmtree(path)
142 else:
143 # file
144 os.remove(path)
145
146
mbligh6f8bab42008-02-29 22:45:14 +0000147class DatabaseConn:
148 def __init__(self):
149 self.reconnect_wait = 20
150 self.conn = None
151 self.cur = None
mbligh36768f02008-02-22 18:28:33 +0000152
mbligh4eb2df22008-03-13 15:39:29 +0000153 import MySQLdb.converters
154 self.convert_dict = MySQLdb.converters.conversions
155 self.convert_dict.setdefault(bool, self.convert_boolean)
156
mbligh6f8bab42008-02-29 22:45:14 +0000157 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000158
159
mbligh4eb2df22008-03-13 15:39:29 +0000160 @staticmethod
161 def convert_boolean(boolean, conversion_dict):
162 'Convert booleans to integer strings'
163 return str(int(boolean))
164
165
mbligh6f8bab42008-02-29 22:45:14 +0000166 def connect(self):
167 self.disconnect()
168
169 # get global config and parse for info
170 c = global_config.global_config
171 dbase = "AUTOTEST_WEB"
mbligh104e9ce2008-03-11 22:01:44 +0000172 DB_HOST = c.get_config_value(dbase, "host")
173 DB_SCHEMA = c.get_config_value(dbase, "database")
mbligh6f8bab42008-02-29 22:45:14 +0000174
175 global _testing_mode
176 if _testing_mode:
177 DB_SCHEMA = 'stresstest_autotest_web'
178
mbligh104e9ce2008-03-11 22:01:44 +0000179 DB_USER = c.get_config_value(dbase, "user")
180 DB_PASS = c.get_config_value(dbase, "password")
mbligh6f8bab42008-02-29 22:45:14 +0000181
182 while not self.conn:
183 try:
mbligh4eb2df22008-03-13 15:39:29 +0000184 self.conn = MySQLdb.connect(
185 host=DB_HOST, user=DB_USER, passwd=DB_PASS,
186 db=DB_SCHEMA, conv=self.convert_dict)
mbligh6f8bab42008-02-29 22:45:14 +0000187
188 self.conn.autocommit(True)
189 self.cur = self.conn.cursor()
190 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000191 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000192 print "Can't connect to MYSQL; reconnecting"
193 time.sleep(self.reconnect_wait)
194 self.disconnect()
195
196
197 def disconnect(self):
198 if self.conn:
199 self.conn.close()
200 self.conn = None
201 self.cur = None
202
203
204 def execute(self, *args, **dargs):
205 while (True):
206 try:
207 self.cur.execute(*args, **dargs)
208 return self.cur.fetchall()
209 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000210 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000211 print "MYSQL connection died; reconnecting"
212 time.sleep(self.reconnect_wait)
213 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000214
215
mblighdbdac6c2008-03-05 15:49:58 +0000216def generate_parse_command(results_dir, flags=""):
217 parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
218 output = os.path.abspath(os.path.join(results_dir, '.parse.log'))
219 cmd = "%s %s -r -o %s > %s 2>&1 &"
220 return cmd % (parse, flags, results_dir, output)
221
222
mbligh36768f02008-02-22 18:28:33 +0000223def parse_results(results_dir, flags=""):
mbligh4314a712008-02-29 22:44:30 +0000224 if _testing_mode:
225 return
mblighdbdac6c2008-03-05 15:49:58 +0000226 os.system(generate_parse_command(results_dir, flags))
mbligh36768f02008-02-22 18:28:33 +0000227
228
mblighbb421852008-03-11 22:36:16 +0000229def send_notify_email(subject, message):
230 if not _notify_email:
231 return
232
233 message = "%s / %s / %s\n%s" % (socket.gethostname(), os.getpid(),
234 time.strftime("%X %x"), message)
235 sender = pwd.getpwuid(os.getuid())[0] # see os.getlogin() online docs
236 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
237 sender, _notify_email, subject, message)
238 mailer = smtplib.SMTP('localhost')
239 mailer.sendmail(sender, _notify_email, msg)
240 mailer.quit()
241
242
mbligh36768f02008-02-22 18:28:33 +0000243def log_stacktrace(reason):
244 (type, value, tb) = sys.exc_info()
245 str = "EXCEPTION: %s\n" % reason
mbligh36768f02008-02-22 18:28:33 +0000246 str += ''.join(traceback.format_exception(type, value, tb))
247
248 sys.stderr.write("\n%s\n" % str)
mblighbb421852008-03-11 22:36:16 +0000249 send_notify_email("monitor_db exception", str)
mbligh36768f02008-02-22 18:28:33 +0000250
mblighbb421852008-03-11 22:36:16 +0000251
252def get_proc_poll_fn(pid):
253 proc_path = os.path.join('/proc', str(pid))
254 def poll_fn():
255 if os.path.exists(proc_path):
256 return None
257 return 0 # we can't get a real exit code
258 return poll_fn
259
260
261def kill_autoserv(pid, poll_fn=None):
262 print 'killing', pid
263 if poll_fn is None:
264 poll_fn = get_proc_poll_fn(pid)
265 if poll_fn() == None:
266 os.kill(pid, signal.SIGCONT)
267 os.kill(pid, signal.SIGTERM)
mbligh36768f02008-02-22 18:28:33 +0000268
269
270class Dispatcher:
mbligh90a549d2008-03-25 23:52:34 +0000271 autoserv_procs_cache = None
272
mblighbb421852008-03-11 22:36:16 +0000273 def __init__(self):
mbligh36768f02008-02-22 18:28:33 +0000274 self._agents = []
mbligh36768f02008-02-22 18:28:33 +0000275
mbligh36768f02008-02-22 18:28:33 +0000276
mblighbb421852008-03-11 22:36:16 +0000277 def do_initial_recovery(self, recover_hosts=True):
278 # always recover processes
279 self._recover_processes()
280
281 if recover_hosts:
282 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000283
284
285 def tick(self):
mbligh90a549d2008-03-25 23:52:34 +0000286 Dispatcher.autoserv_procs_cache = None
mblighbb421852008-03-11 22:36:16 +0000287 self._find_aborting()
288 self._find_more_work()
mbligh36768f02008-02-22 18:28:33 +0000289 self._handle_agents()
290
291
292 def add_agent(self, agent):
293 self._agents.append(agent)
294 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000295
296 # Find agent corresponding to the specified queue_entry
297 def get_agents(self, queue_entry):
298 res_agents = []
299 for agent in self._agents:
300 if queue_entry.id in agent.queue_entry_ids:
301 res_agents.append(agent)
302 return res_agents
303
304
305 def remove_agent(self, agent):
306 self._agents.remove(agent)
mbligh36768f02008-02-22 18:28:33 +0000307
308
mbligh90a549d2008-03-25 23:52:34 +0000309 @classmethod
310 def find_autoservs(cls, orphans_only=False):
mblighbb421852008-03-11 22:36:16 +0000311 """\
312 Returns a dict mapping pids to command lines for root autoserv
mbligh90a549d2008-03-25 23:52:34 +0000313 processes. If orphans_only=True, return only processes that
314 have been orphaned (i.e. parent pid = 1).
mblighbb421852008-03-11 22:36:16 +0000315 """
mbligh90a549d2008-03-25 23:52:34 +0000316 if cls.autoserv_procs_cache is not None:
317 return cls.autoserv_procs_cache
318
mblighbb421852008-03-11 22:36:16 +0000319 proc = subprocess.Popen(
mbligh90a549d2008-03-25 23:52:34 +0000320 ['/bin/ps', 'x', '-o', 'pid,pgid,ppid,comm,args'],
mblighbb421852008-03-11 22:36:16 +0000321 stdout=subprocess.PIPE)
322 # split each line into the four columns output by ps
mbligh90a549d2008-03-25 23:52:34 +0000323 procs = [line.split(None, 4) for line in
mblighbb421852008-03-11 22:36:16 +0000324 proc.communicate()[0].splitlines()]
mbligh90a549d2008-03-25 23:52:34 +0000325 autoserv_procs = {}
326 for proc in procs:
327 # check ppid == 1 for orphans
328 if orphans_only and proc[2] != 1:
329 continue
330 # only root autoserv processes have pgid == pid
331 if (proc[3] == 'autoserv' and # comm
332 proc[1] == proc[0]): # pgid == pid
333 # map pid to args
334 autoserv_procs[int(proc[0])] = proc[4]
335 cls.autoserv_procs_cache = autoserv_procs
336 return autoserv_procs
mblighbb421852008-03-11 22:36:16 +0000337
338
339 def recover_queue_entry(self, queue_entry, run_monitor):
340 job = queue_entry.job
341 if job.is_synchronous():
342 all_queue_entries = job.get_host_queue_entries()
343 else:
344 all_queue_entries = [queue_entry]
345 all_queue_entry_ids = [queue_entry.id for queue_entry
346 in all_queue_entries]
347 queue_task = RecoveryQueueTask(
348 job=queue_entry.job,
349 queue_entries=all_queue_entries,
350 run_monitor=run_monitor)
351 self.add_agent(Agent(tasks=[queue_task],
352 queue_entry_ids=all_queue_entry_ids))
353
354
355 def _recover_processes(self):
mbligh90a549d2008-03-25 23:52:34 +0000356 orphans = self.find_autoservs(orphans_only=True)
mblighbb421852008-03-11 22:36:16 +0000357
358 # first, recover running queue entries
359 rows = _db.execute("""SELECT * FROM host_queue_entries
360 WHERE status = 'Running'""")
361 queue_entries = [HostQueueEntry(row=i) for i in rows]
362 requeue_entries = []
363 recovered_entry_ids = set()
364 for queue_entry in queue_entries:
365 run_monitor = PidfileRunMonitor(
366 queue_entry.results_dir())
367 pid, exit_code = run_monitor.get_pidfile_info()
368 if pid is None:
369 # autoserv apparently never got run, so requeue
370 requeue_entries.append(queue_entry)
371 continue
372 if queue_entry.id in recovered_entry_ids:
373 # synchronous job we've already recovered
374 continue
375 print 'Recovering queue entry %d (pid %d)' % (
376 queue_entry.id, pid)
377 job = queue_entry.job
378 if job.is_synchronous():
379 for entry in job.get_host_queue_entries():
380 assert entry.active
381 recovered_entry_ids.add(entry.id)
382 self.recover_queue_entry(queue_entry,
383 run_monitor)
384 orphans.pop(pid, None)
385
386 # and requeue other active queue entries
387 rows = _db.execute("""SELECT * FROM host_queue_entries
388 WHERE active AND NOT complete
389 AND status != 'Running'
390 AND status != 'Pending'
391 AND status != 'Abort'
392 AND status != 'Aborting'""")
393 queue_entries = [HostQueueEntry(row=i) for i in rows]
394 for queue_entry in queue_entries + requeue_entries:
395 print 'Requeuing running QE %d' % queue_entry.id
mbligh90a549d2008-03-25 23:52:34 +0000396 queue_entry.clear_results_dir(dont_delete_files=True)
mblighbb421852008-03-11 22:36:16 +0000397 queue_entry.requeue()
398
399
400 # now kill any remaining autoserv processes
401 for pid in orphans.keys():
402 print 'Killing orphan %d (%s)' % (pid, orphans[pid])
403 kill_autoserv(pid)
404
405 # recover aborting tasks
mbligh90a549d2008-03-25 23:52:34 +0000406 rebooting_host_ids = set()
mblighd5c95802008-03-05 00:33:46 +0000407 rows = _db.execute("""SELECT * FROM host_queue_entries
408 WHERE status='Abort' or status='Aborting'""")
mblighbb421852008-03-11 22:36:16 +0000409 queue_entries = [HostQueueEntry(row=i) for i in rows]
410 for queue_entry in queue_entries:
mbligh90a549d2008-03-25 23:52:34 +0000411 print 'Recovering aborting QE %d' % queue_entry.id
mblighbb421852008-03-11 22:36:16 +0000412 queue_host = queue_entry.get_host()
413 reboot_task = RebootTask(queue_host)
414 verify_task = VerifyTask(host = queue_host)
415 self.add_agent(Agent(tasks=[reboot_task,
416 verify_task],
417 queue_entry_ids=[queue_entry.id]))
418 queue_entry.set_status('Aborted')
419 # Secure the host from being picked up
420 queue_host.set_status('Rebooting')
mbligh90a549d2008-03-25 23:52:34 +0000421 rebooting_host_ids.add(queue_host.id)
mblighd5c95802008-03-05 00:33:46 +0000422
mblighbb421852008-03-11 22:36:16 +0000423 # reverify hosts that were in the middle of verify, repair or
424 # reboot
mbligh90a549d2008-03-25 23:52:34 +0000425 self._reverify_hosts_where("""(status = 'Repairing' OR
426 status = 'Verifying' OR
427 status = 'Rebooting')""",
428 exclude_ids=rebooting_host_ids)
429
430 # finally, recover "Running" hosts with no active queue entries,
431 # although this should never happen
432 message = ('Recovering running host %s - this probably '
433 'indicates a scheduler bug')
434 self._reverify_hosts_where("""status = 'Running' AND
435 id NOT IN (SELECT host_id
436 FROM host_queue_entries
437 WHERE active)""",
438 print_message=message)
439
440
441 def _reverify_hosts_where(self, where,
442 print_message='Reverifying host %s',
443 exclude_ids=set()):
mbligh5244cbb2008-04-24 20:39:52 +0000444 rows = _db.execute('SELECT * FROM hosts WHERE locked = 0 AND '
445 'invalid = 0 AND ' + where)
mblighbb421852008-03-11 22:36:16 +0000446 hosts = [Host(row=i) for i in rows]
447 for host in hosts:
mbligh90a549d2008-03-25 23:52:34 +0000448 if host.id in exclude_ids:
449 continue
450 if print_message is not None:
451 print print_message % host.hostname
452 verify_task = VerifyTask(host = host)
453 self.add_agent(Agent(tasks = [verify_task]))
mblighbb421852008-03-11 22:36:16 +0000454
455
456 def _recover_hosts(self):
mbligh90a549d2008-03-25 23:52:34 +0000457 # recover "Repair Failed" hosts
458 message = 'Reverifying dead host %s'
459 self._reverify_hosts_where("status = 'Repair Failed'",
460 print_message=message)
mbligh36768f02008-02-22 18:28:33 +0000461
462
463 def _find_more_work(self):
464 print "finding work"
465
466 num_started = 0
467 for host in idle_hosts():
468 tasks = host.next_queue_entries()
469 if tasks:
470 for next in tasks:
471 try:
472 agent = next.run(assigned_host=host)
473 if agent:
474 self.add_agent(agent)
475
476 num_started += 1
477 if num_started>=100:
478 return
479 break
480 except:
481 next.set_status('Failed')
482
483# if next.host:
484# next.host.set_status('Ready')
485
486 log_stacktrace("task_id = %d" % next.id)
487
488
mblighd5c95802008-03-05 00:33:46 +0000489 def _find_aborting(self):
490 num_aborted = 0
491 # Find jobs that are aborting
492 for entry in queue_entries_to_abort():
493 agents_to_abort = self.get_agents(entry)
494 entry_host = entry.get_host()
495 reboot_task = RebootTask(entry_host)
496 verify_task = VerifyTask(host = entry_host)
497 tasks = [reboot_task, verify_task]
498 if agents_to_abort:
499 abort_task = AbortTask(entry, agents_to_abort)
500 tasks.insert(0, abort_task)
501 else:
502 entry.set_status('Aborted')
503 # just to make sure this host does not get
504 # taken away
505 entry_host.set_status('Rebooting')
506 self.add_agent(Agent(tasks=tasks,
507 queue_entry_ids = [entry.id]))
508 num_aborted += 1
509 if num_aborted >= 50:
510 break
511
512
mbligh36768f02008-02-22 18:28:33 +0000513 def _handle_agents(self):
514 still_running = []
515 for agent in self._agents:
516 agent.tick()
517 if not agent.is_done():
518 still_running.append(agent)
519 else:
520 print "agent finished"
521 self._agents = still_running
522
523
524class RunMonitor(object):
525 def __init__(self, cmd, nice_level = None, log_file = None):
526 self.nice_level = nice_level
527 self.log_file = log_file
528 self.proc = self.run(cmd)
529
530 def run(self, cmd):
531 if self.nice_level:
532 nice_cmd = ['nice','-n', str(self.nice_level)]
533 nice_cmd.extend(cmd)
534 cmd = nice_cmd
535
536 out_file = None
537 if self.log_file:
538 try:
mblighbb421852008-03-11 22:36:16 +0000539 os.makedirs(os.path.dirname(self.log_file))
mblighcadb3532008-04-15 17:46:26 +0000540 except OSError, exc:
541 if exc.errno != errno.EEXIST:
542 log_stacktrace(
543 'Unexpected error creating logfile '
544 'directory for %s' % self.log_file)
545 try:
mbligh36768f02008-02-22 18:28:33 +0000546 out_file = open(self.log_file, 'a')
547 out_file.write("\n%s\n" % ('*'*80))
mblighcadb3532008-04-15 17:46:26 +0000548 out_file.write("%s> %s\n" %
549 (time.strftime("%X %x"), cmd))
mbligh36768f02008-02-22 18:28:33 +0000550 out_file.write("%s\n" % ('*'*80))
mblighcadb3532008-04-15 17:46:26 +0000551 except (OSError, IOError):
552 log_stacktrace('Error opening log file %s' %
553 self.log_file)
554
mbligh36768f02008-02-22 18:28:33 +0000555 if not out_file:
556 out_file = open('/dev/null', 'w')
mblighcadb3532008-04-15 17:46:26 +0000557
mbligh36768f02008-02-22 18:28:33 +0000558 in_devnull = open('/dev/null', 'r')
559 print "cmd = %s" % cmd
560 print "path = %s" % os.getcwd()
561
562 proc = subprocess.Popen(cmd, stdout=out_file,
563 stderr=subprocess.STDOUT, stdin=in_devnull)
564 out_file.close()
565 in_devnull.close()
566 return proc
567
568
mblighbb421852008-03-11 22:36:16 +0000569 def get_pid(self):
570 return self.proc.pid
571
572
mbligh36768f02008-02-22 18:28:33 +0000573 def kill(self):
mblighbb421852008-03-11 22:36:16 +0000574 kill_autoserv(self.get_pid(), self.exit_code)
575
mbligh36768f02008-02-22 18:28:33 +0000576
577 def exit_code(self):
578 return self.proc.poll()
579
580
mblighbb421852008-03-11 22:36:16 +0000581class PidfileException(Exception):
582 """\
583 Raised when there's some unexpected behavior with the pid file.
584 """
585
586
587class PidfileRunMonitor(RunMonitor):
588 def __init__(self, results_dir, cmd=None, nice_level=None,
589 log_file=None):
590 self.results_dir = os.path.abspath(results_dir)
591 self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
592 self.lost_process = False
mbligh90a549d2008-03-25 23:52:34 +0000593 self.start_time = time.time()
mblighbb421852008-03-11 22:36:16 +0000594 if cmd is None:
595 # we're reattaching to an existing pid, so don't call
596 # the superconstructor (we don't want to kick off a new
597 # process)
598 pass
599 else:
mblighd64e5702008-04-04 21:39:28 +0000600 super(PidfileRunMonitor, self).__init__(cmd,
601 nice_level, log_file)
mblighbb421852008-03-11 22:36:16 +0000602
603
604 def get_pid(self):
605 pid, exit_status = self.get_pidfile_info()
606 assert pid is not None
607 return pid
608
609
mbligh90a549d2008-03-25 23:52:34 +0000610 def _check_command_line(self, command_line, spacer=' ',
611 print_error=False):
612 results_dir_arg = spacer.join(('', '-r', self.results_dir, ''))
613 match = results_dir_arg in command_line
614 if print_error and not match:
615 print '%s not found in %s' % (repr(results_dir_arg),
616 repr(command_line))
617 return match
618
619
620 def _check_proc_fs(self, pid):
mblighbb421852008-03-11 22:36:16 +0000621 cmdline_path = os.path.join('/proc', str(pid), 'cmdline')
622 try:
623 cmdline_file = open(cmdline_path, 'r')
624 cmdline = cmdline_file.read().strip()
625 cmdline_file.close()
626 except IOError:
627 return False
628 # /proc/.../cmdline has \x00 separating args
mbligh90a549d2008-03-25 23:52:34 +0000629 return self._check_command_line(cmdline, spacer='\x00',
630 print_error=True)
mblighbb421852008-03-11 22:36:16 +0000631
632
633 def read_pidfile(self):
634 if not os.path.exists(self.pid_file):
635 return None, None
636 file_obj = open(self.pid_file, 'r')
637 lines = file_obj.readlines()
638 file_obj.close()
639 assert 1 <= len(lines) <= 2
640 try:
641 pid = int(lines[0])
642 exit_status = None
643 if len(lines) == 2:
644 exit_status = int(lines[1])
645 except ValueError, exc:
646 raise Exception('Corrupt pid file: ' + str(exc.args))
647
648 return pid, exit_status
649
650
mbligh90a549d2008-03-25 23:52:34 +0000651 def _find_autoserv_proc(self):
652 autoserv_procs = Dispatcher.find_autoservs()
653 for pid, args in autoserv_procs.iteritems():
654 if self._check_command_line(args):
655 return pid, args
656 return None, None
657
658
mblighbb421852008-03-11 22:36:16 +0000659 def get_pidfile_info(self):
660 """\
661 Returns:
662 None, None if autoserv has not yet run
663 pid, None if autoserv is running
664 pid, exit_status if autoserv has completed
665 """
666 if self.lost_process:
667 return self.pid, self.exit_status
668
669 pid, exit_status = self.read_pidfile()
670
mbligh90a549d2008-03-25 23:52:34 +0000671 if pid is None:
672 return self._handle_no_pid()
673
674 if exit_status is None:
675 # double check whether or not autoserv is running
676 proc_running = self._check_proc_fs(pid)
677 if proc_running:
678 return pid, exit_status
679
680 # pid but no process - maybe process *just* exited
mblighbb421852008-03-11 22:36:16 +0000681 pid, exit_status = self.read_pidfile()
mbligh90a549d2008-03-25 23:52:34 +0000682 if exit_status is None:
mblighbb421852008-03-11 22:36:16 +0000683 # autoserv exited without writing an exit code
684 # to the pidfile
685 error = ('autoserv died without writing exit '
686 'code')
687 message = error + '\nPid: %s\nPidfile: %s' % (
688 pid, self.pid_file)
689 print message
690 send_notify_email(error, message)
mbligh90a549d2008-03-25 23:52:34 +0000691 self.on_lost_process(pid)
mblighbb421852008-03-11 22:36:16 +0000692 return self.pid, self.exit_status
693
694 return pid, exit_status
695
696
mbligh90a549d2008-03-25 23:52:34 +0000697 def _handle_no_pid(self):
698 """\
699 Called when no pidfile is found or no pid is in the pidfile.
700 """
701 # is autoserv running?
702 pid, args = self._find_autoserv_proc()
703 if pid is None:
704 # no autoserv process running
705 message = 'No pid found at ' + self.pid_file
706 else:
707 message = ("Process %d (%s) hasn't written pidfile %s" %
708 (pid, args, self.pid_file))
709
710 print message
711 if time.time() - self.start_time > PIDFILE_TIMEOUT:
712 send_notify_email('Process has failed to write pidfile',
713 message)
714 if pid is not None:
715 kill_autoserv(pid)
716 else:
717 pid = 0
718 self.on_lost_process(pid)
719 return self.pid, self.exit_status
720
721 return None, None
722
723
724 def on_lost_process(self, pid):
725 """\
726 Called when autoserv has exited without writing an exit status,
727 or we've timed out waiting for autoserv to write a pid to the
728 pidfile. In either case, we just return failure and the caller
729 should signal some kind of warning.
730
731 pid is unimportant here, as it shouldn't be used by anyone.
732 """
733 self.lost_process = True
734 self.pid = pid
735 self.exit_status = 1
736
737
mblighbb421852008-03-11 22:36:16 +0000738 def exit_code(self):
739 pid, exit_code = self.get_pidfile_info()
mblighbb421852008-03-11 22:36:16 +0000740 return exit_code
741
742
mbligh36768f02008-02-22 18:28:33 +0000743class Agent(object):
mblighd5c95802008-03-05 00:33:46 +0000744 def __init__(self, tasks, queue_entry_ids=[]):
mbligh36768f02008-02-22 18:28:33 +0000745 self.active_task = None
746 self.queue = Queue.Queue(0)
747 self.dispatcher = None
mblighd5c95802008-03-05 00:33:46 +0000748 self.queue_entry_ids = queue_entry_ids
mbligh36768f02008-02-22 18:28:33 +0000749
750 for task in tasks:
751 self.add_task(task)
752
753
754 def add_task(self, task):
755 self.queue.put_nowait(task)
756 task.agent = self
757
758
759 def tick(self):
760 print "agent tick"
761 if self.active_task and not self.active_task.is_done():
762 self.active_task.poll()
763 else:
764 self._next_task();
765
766
767 def _next_task(self):
768 print "agent picking task"
769 if self.active_task:
770 assert self.active_task.is_done()
771
mblighe2586682008-02-29 22:45:46 +0000772 if not self.active_task.success:
773 self.on_task_failure()
774
mbligh36768f02008-02-22 18:28:33 +0000775 self.active_task = None
776 if not self.is_done():
777 self.active_task = self.queue.get_nowait()
778 if self.active_task:
779 self.active_task.start()
780
781
mblighe2586682008-02-29 22:45:46 +0000782 def on_task_failure(self):
mblighe2586682008-02-29 22:45:46 +0000783 self.queue = Queue.Queue(0)
784 for task in self.active_task.failure_tasks:
785 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +0000786
mblighe2586682008-02-29 22:45:46 +0000787
mbligh36768f02008-02-22 18:28:33 +0000788 def is_done(self):
789 return self.active_task == None and self.queue.empty()
790
791
792 def start(self):
793 assert self.dispatcher
794
795 self._next_task()
796
mblighd5c95802008-03-05 00:33:46 +0000797
mbligh36768f02008-02-22 18:28:33 +0000798class AgentTask(object):
mbligh16c722d2008-03-05 00:58:44 +0000799 def __init__(self, cmd, failure_tasks = []):
mbligh36768f02008-02-22 18:28:33 +0000800 self.done = False
801 self.failure_tasks = failure_tasks
802 self.started = False
803 self.cmd = cmd
mblighd5c95802008-03-05 00:33:46 +0000804 self.task = None
mbligh36768f02008-02-22 18:28:33 +0000805 self.agent = None
mblighd5c95802008-03-05 00:33:46 +0000806 self.monitor = None
mblighd64e5702008-04-04 21:39:28 +0000807 self.success = None
mbligh36768f02008-02-22 18:28:33 +0000808
809
810 def poll(self):
811 print "poll"
mblighd5c95802008-03-05 00:33:46 +0000812 if self.monitor:
mbligh36768f02008-02-22 18:28:33 +0000813 self.tick(self.monitor.exit_code())
814 else:
815 self.finished(False)
816
817
818 def tick(self, exit_code):
819 if exit_code==None:
820 return
821# print "exit_code was %d" % exit_code
822 if exit_code == 0:
823 success = True
824 else:
825 success = False
826
827 self.finished(success)
828
829
830 def is_done(self):
831 return self.done
832
833
834 def finished(self, success):
835 self.done = True
836 self.success = success
837 self.epilog()
838
839
840 def prolog(self):
841 pass
842
mblighd64e5702008-04-04 21:39:28 +0000843
844 def create_temp_resultsdir(self, suffix=''):
845 self.temp_results_dir = tempfile.mkdtemp(suffix=suffix)
846
847
848 def cleanup(self):
849 if (hasattr(self, 'temp_results_dir') and
850 os.path.exists(self.temp_results_dir)):
851 shutil.rmtree(self.temp_results_dir)
852
mbligh36768f02008-02-22 18:28:33 +0000853
854 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +0000855 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +0000856
857
858 def start(self):
859 assert self.agent
860
861 if not self.started:
862 self.prolog()
863 self.run()
864
865 self.started = True
866
mblighd64e5702008-04-04 21:39:28 +0000867
mbligh36768f02008-02-22 18:28:33 +0000868 def abort(self):
mblighd5c95802008-03-05 00:33:46 +0000869 if self.monitor:
870 self.monitor.kill()
871 self.done = True
mblighd64e5702008-04-04 21:39:28 +0000872 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +0000873
874
875 def run(self):
876 if self.cmd:
877 print "agent starting monitor"
mbligh36768f02008-02-22 18:28:33 +0000878 log_file = None
879 if hasattr(self, 'host'):
mblighbb421852008-03-11 22:36:16 +0000880 log_file = os.path.join(RESULTS_DIR, 'hosts',
881 self.host.hostname)
882 self.monitor = RunMonitor(
883 self.cmd, nice_level = AUTOSERV_NICE_LEVEL,
884 log_file = log_file)
mbligh36768f02008-02-22 18:28:33 +0000885
886
887class RepairTask(AgentTask):
mbligh16c722d2008-03-05 00:58:44 +0000888 def __init__(self, host, fail_queue_entry=None):
889 """\
890 fail_queue_entry: queue entry to mark failed if this repair
891 fails.
892 """
mblighd64e5702008-04-04 21:39:28 +0000893 self.create_temp_resultsdir('.repair')
894 cmd = [_autoserv_path , '-R', '-m', host.hostname,
895 '-r', self.temp_results_dir]
mbligh36768f02008-02-22 18:28:33 +0000896 self.host = host
mbligh16c722d2008-03-05 00:58:44 +0000897 self.fail_queue_entry = fail_queue_entry
mblighd64e5702008-04-04 21:39:28 +0000898 super(RepairTask, self).__init__(cmd)
mblighe2586682008-02-29 22:45:46 +0000899
mbligh36768f02008-02-22 18:28:33 +0000900
901 def prolog(self):
902 print "repair_task starting"
903 self.host.set_status('Repairing')
904
905
906 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +0000907 super(RepairTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +0000908 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000909 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +0000910 else:
mbligh16c722d2008-03-05 00:58:44 +0000911 self.host.set_status('Repair Failed')
912 if self.fail_queue_entry:
913 self.fail_queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +0000914
915
916class VerifyTask(AgentTask):
917 def __init__(self, queue_entry=None, host=None):
918 assert bool(queue_entry) != bool(host)
919
920 self.host = host or queue_entry.host
921 self.queue_entry = queue_entry
922
mblighd64e5702008-04-04 21:39:28 +0000923 self.create_temp_resultsdir('.verify')
mbligh48c10a52008-02-29 22:46:38 +0000924 cmd = [_autoserv_path,'-v','-m',self.host.hostname,
mbligh36768f02008-02-22 18:28:33 +0000925 '-r', self.temp_results_dir]
926
mbligh16c722d2008-03-05 00:58:44 +0000927 fail_queue_entry = None
928 if queue_entry and not queue_entry.meta_host:
929 fail_queue_entry = queue_entry
930 failure_tasks = [RepairTask(self.host, fail_queue_entry)]
mblighe2586682008-02-29 22:45:46 +0000931
mblighd64e5702008-04-04 21:39:28 +0000932 super(VerifyTask, self).__init__(cmd,
933 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +0000934
935
mbligh36768f02008-02-22 18:28:33 +0000936 def prolog(self):
937 print "starting verify on %s" % (self.host.hostname)
938 if self.queue_entry:
939 self.queue_entry.set_status('Verifying')
mblighdffd6372008-02-29 22:47:33 +0000940 self.queue_entry.clear_results_dir(
941 self.queue_entry.verify_results_dir())
mbligh36768f02008-02-22 18:28:33 +0000942 self.host.set_status('Verifying')
943
944
mblighd64e5702008-04-04 21:39:28 +0000945 def cleanup(self):
946 if not os.path.exists(self.temp_results_dir):
947 return
mbligh36768f02008-02-22 18:28:33 +0000948 if self.queue_entry and (self.success or
mblighd64e5702008-04-04 21:39:28 +0000949 not self.queue_entry.meta_host):
mbligh36768f02008-02-22 18:28:33 +0000950 self.move_results()
mblighd64e5702008-04-04 21:39:28 +0000951 super(VerifyTask, self).cleanup()
952
953
954 def epilog(self):
955 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +0000956
957 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000958 self.host.set_status('Ready')
959 elif self.queue_entry:
mblighdffd6372008-02-29 22:47:33 +0000960 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +0000961
962
963 def move_results(self):
964 assert self.queue_entry is not None
mblighe2586682008-02-29 22:45:46 +0000965 target_dir = self.queue_entry.verify_results_dir()
mbligh36768f02008-02-22 18:28:33 +0000966 if not os.path.exists(target_dir):
967 os.makedirs(target_dir)
968 files = os.listdir(self.temp_results_dir)
969 for filename in files:
mblighbb421852008-03-11 22:36:16 +0000970 if filename == AUTOSERV_PID_FILE:
971 continue
mblighe2586682008-02-29 22:45:46 +0000972 self.force_move(os.path.join(self.temp_results_dir,
973 filename),
974 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +0000975
976
mblighe2586682008-02-29 22:45:46 +0000977 @staticmethod
978 def force_move(source, dest):
979 """\
980 Replacement for shutil.move() that will delete the destination
981 if it exists, even if it's a directory.
982 """
983 if os.path.exists(dest):
984 print ('Warning: removing existing destination file ' +
985 dest)
986 remove_file_or_dir(dest)
987 shutil.move(source, dest)
988
989
mblighdffd6372008-02-29 22:47:33 +0000990class VerifySynchronousTask(VerifyTask):
991 def __init__(self, queue_entry):
mblighd64e5702008-04-04 21:39:28 +0000992 super(VerifySynchronousTask, self).__init__(
993 queue_entry = queue_entry)
mblighdffd6372008-02-29 22:47:33 +0000994
995
mbligh16c722d2008-03-05 00:58:44 +0000996 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +0000997 super(VerifySynchronousTask, self).epilog()
mbligh16c722d2008-03-05 00:58:44 +0000998 if self.success:
999 if self.queue_entry.job.num_complete() > 0:
1000 # some other entry failed verify, and we've
1001 # already been marked as stopped
1002 return
mblighdffd6372008-02-29 22:47:33 +00001003
mbligh16c722d2008-03-05 00:58:44 +00001004 self.queue_entry.set_status('Pending')
1005 job = self.queue_entry.job
1006 if job.is_ready():
1007 agent = job.run(self.queue_entry)
1008 self.agent.dispatcher.add_agent(agent)
mblighe2586682008-02-29 22:45:46 +00001009
mbligh36768f02008-02-22 18:28:33 +00001010class QueueTask(AgentTask):
1011 def __init__(self, job, queue_entries, cmd):
mblighd64e5702008-04-04 21:39:28 +00001012 super(QueueTask, self).__init__(cmd)
mbligh36768f02008-02-22 18:28:33 +00001013 self.job = job
1014 self.queue_entries = queue_entries
1015
1016
mblighbb421852008-03-11 22:36:16 +00001017 @staticmethod
1018 def _write_keyval(results_dir, field, value):
1019 key_path = os.path.join(results_dir, 'keyval')
mbligh36768f02008-02-22 18:28:33 +00001020 keyval_file = open(key_path, 'a')
1021 print >> keyval_file, '%s=%d' % (field, value)
1022 keyval_file.close()
1023
1024
mblighbb421852008-03-11 22:36:16 +00001025 def results_dir(self):
1026 return self.queue_entries[0].results_dir()
1027
1028
1029 def run(self):
1030 """\
1031 Override AgentTask.run() so we can use a PidfileRunMonitor.
1032 """
1033 self.monitor = PidfileRunMonitor(self.results_dir(),
1034 cmd=self.cmd,
1035 nice_level=AUTOSERV_NICE_LEVEL)
1036
1037
mbligh36768f02008-02-22 18:28:33 +00001038 def prolog(self):
mblighe2586682008-02-29 22:45:46 +00001039 # write some job timestamps into the job keyval file
1040 queued = time.mktime(self.job.created_on.timetuple())
1041 started = time.time()
mblighbb421852008-03-11 22:36:16 +00001042 self._write_keyval(self.results_dir(), "job_queued", queued)
1043 self._write_keyval(self.results_dir(), "job_started", started)
mbligh36768f02008-02-22 18:28:33 +00001044 for queue_entry in self.queue_entries:
1045 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
1046 queue_entry.set_status('Running')
1047 queue_entry.host.set_status('Running')
mblighe2586682008-02-29 22:45:46 +00001048 if (not self.job.is_synchronous() and
1049 self.job.num_machines() > 1):
1050 assert len(self.queue_entries) == 1
1051 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001052
1053
1054 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001055 super(QueueTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001056 if self.success:
1057 status = 'Completed'
1058 else:
1059 status = 'Failed'
1060
mblighe2586682008-02-29 22:45:46 +00001061 # write another timestamp into the job keyval file
1062 finished = time.time()
mblighbb421852008-03-11 22:36:16 +00001063 self._write_keyval(self.results_dir(), "job_finished", finished)
mbligh36768f02008-02-22 18:28:33 +00001064 for queue_entry in self.queue_entries:
1065 queue_entry.set_status(status)
1066 queue_entry.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001067
1068 if self.job.is_synchronous() or self.job.num_machines()==1:
1069 if self.job.is_finished():
1070 parse_results(self.job.results_dir())
1071 else:
1072 for queue_entry in self.queue_entries:
mblighbb421852008-03-11 22:36:16 +00001073 parse_results(queue_entry.results_dir(),
1074 flags='-l 2')
1075
mbligh36768f02008-02-22 18:28:33 +00001076 print "queue_task finished with %s/%s" % (status, self.success)
1077
1078
mblighbb421852008-03-11 22:36:16 +00001079class RecoveryQueueTask(QueueTask):
1080 def __init__(self, job, queue_entries, run_monitor):
mblighd64e5702008-04-04 21:39:28 +00001081 super(RecoveryQueueTask, self).__init__(job,
1082 queue_entries, cmd=None)
mblighbb421852008-03-11 22:36:16 +00001083 self.run_monitor = run_monitor
1084
1085
1086 def run(self):
1087 self.monitor = self.run_monitor
1088
1089
1090 def prolog(self):
1091 # recovering an existing process - don't do prolog
1092 pass
1093
1094
mbligh36768f02008-02-22 18:28:33 +00001095class RebootTask(AgentTask):
mblighd5c95802008-03-05 00:33:46 +00001096 def __init__(self, host):
1097 global _autoserv_path
1098
1099 # Current implementation of autoserv requires control file
1100 # to be passed on reboot action request. TODO: remove when no
1101 # longer appropriate.
mblighd64e5702008-04-04 21:39:28 +00001102 self.create_temp_resultsdir('.reboot')
mblighd5c95802008-03-05 00:33:46 +00001103 self.cmd = [_autoserv_path, '-b', '-m', host.hostname,
mblighd64e5702008-04-04 21:39:28 +00001104 '-r', self.temp_results_dir, '/dev/null']
mbligh36768f02008-02-22 18:28:33 +00001105 self.host = host
mblighd64e5702008-04-04 21:39:28 +00001106 super(RebootTask, self).__init__(self.cmd,
mbligh16c722d2008-03-05 00:58:44 +00001107 failure_tasks=[RepairTask(host)])
1108
mblighd5c95802008-03-05 00:33:46 +00001109
1110 def prolog(self):
1111 print "starting reboot task for host: %s" % self.host.hostname
1112 self.host.set_status("Rebooting")
1113
mblighd5c95802008-03-05 00:33:46 +00001114
1115class AbortTask(AgentTask):
1116 def __init__(self, queue_entry, agents_to_abort):
1117 self.queue_entry = queue_entry
1118 self.agents_to_abort = agents_to_abort
1119 for agent in agents_to_abort:
1120 agent.dispatcher.remove_agent(agent)
mblighd64e5702008-04-04 21:39:28 +00001121 super(AbortTask, self).__init__('')
mbligh36768f02008-02-22 18:28:33 +00001122
1123
mblighd5c95802008-03-05 00:33:46 +00001124 def prolog(self):
1125 print "starting abort on host %s, job %s" % (
1126 self.queue_entry.host_id, self.queue_entry.job_id)
1127 self.queue_entry.set_status('Aborting')
1128
mbligh36768f02008-02-22 18:28:33 +00001129
mblighd5c95802008-03-05 00:33:46 +00001130 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001131 super(AbortTask, self).epilog()
mblighd5c95802008-03-05 00:33:46 +00001132 self.queue_entry.set_status('Aborted')
1133 self.success = True
mbligh36768f02008-02-22 18:28:33 +00001134
mblighd64e5702008-04-04 21:39:28 +00001135
mbligh36768f02008-02-22 18:28:33 +00001136 def run(self):
mblighd5c95802008-03-05 00:33:46 +00001137 for agent in self.agents_to_abort:
1138 if (agent.active_task):
1139 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001140
1141
1142class DBObject(object):
mblighe2586682008-02-29 22:45:46 +00001143 def __init__(self, fields, id=None, row=None, new_record=False):
1144 assert (bool(id) != bool(row)) and fields
mbligh36768f02008-02-22 18:28:33 +00001145
mblighe2586682008-02-29 22:45:46 +00001146 self.__table = self._get_table()
mbligh36768f02008-02-22 18:28:33 +00001147 self.__fields = fields
1148
1149 self.__new_record = new_record
1150
1151 if row is None:
1152 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
mbligh6f8bab42008-02-29 22:45:14 +00001153 rows = _db.execute(sql, (id,))
1154 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001155 raise "row not found (table=%s, id=%s)" % \
1156 (self.__table, id)
mbligh6f8bab42008-02-29 22:45:14 +00001157 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001158
mblighe2586682008-02-29 22:45:46 +00001159 assert len(row)==len(fields), (
1160 "table = %s, row = %s/%d, fields = %s/%d" % (
1161 self.__table, row, len(row), fields, len(fields)))
mbligh36768f02008-02-22 18:28:33 +00001162
1163 self.__valid_fields = {}
1164 for i,value in enumerate(row):
1165 self.__dict__[fields[i]] = value
1166 self.__valid_fields[fields[i]] = True
1167
1168 del self.__valid_fields['id']
1169
mblighe2586682008-02-29 22:45:46 +00001170
1171 @classmethod
1172 def _get_table(cls):
1173 raise NotImplementedError('Subclasses must override this')
1174
1175
mbligh36768f02008-02-22 18:28:33 +00001176 def count(self, where, table = None):
1177 if not table:
1178 table = self.__table
mbligh4314a712008-02-29 22:44:30 +00001179
mbligh6f8bab42008-02-29 22:45:14 +00001180 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001181 SELECT count(*) FROM %s
1182 WHERE %s
1183 """ % (table, where))
mbligh36768f02008-02-22 18:28:33 +00001184
mbligh6f8bab42008-02-29 22:45:14 +00001185 assert len(rows) == 1
1186
1187 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001188
1189
1190 def num_cols(self):
1191 return len(self.__fields)
1192
1193
1194 def update_field(self, field, value):
1195 assert self.__valid_fields[field]
1196
1197 if self.__dict__[field] == value:
1198 return
1199
1200 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
1201 (self.__table, field)
mbligh6f8bab42008-02-29 22:45:14 +00001202 _db.execute(query, (value, self.id))
mbligh36768f02008-02-22 18:28:33 +00001203
1204 self.__dict__[field] = value
1205
1206
1207 def save(self):
1208 if self.__new_record:
1209 keys = self.__fields[1:] # avoid id
1210 columns = ','.join([str(key) for key in keys])
1211 values = ['"%s"' % self.__dict__[key] for key in keys]
1212 values = ','.join(values)
1213 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1214 (self.__table, columns, values)
mbligh6f8bab42008-02-29 22:45:14 +00001215 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001216
1217
mblighe2586682008-02-29 22:45:46 +00001218 def delete(self):
1219 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1220 _db.execute(query, (self.id,))
1221
1222
1223 @classmethod
1224 def fetch(cls, where):
1225 rows = _db.execute(
1226 'SELECT * FROM %s WHERE %s' % (cls._get_table(), where))
1227 for row in rows:
1228 yield cls(row=row)
1229
mbligh36768f02008-02-22 18:28:33 +00001230
1231class IneligibleHostQueue(DBObject):
1232 def __init__(self, id=None, row=None, new_record=None):
1233 fields = ['id', 'job_id', 'host_id']
mblighd64e5702008-04-04 21:39:28 +00001234 super(IneligibleHostQueue, self).__init__(fields, id=id,
1235 row=row, new_record=new_record)
mblighe2586682008-02-29 22:45:46 +00001236
1237
1238 @classmethod
1239 def _get_table(cls):
1240 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001241
1242
1243class Host(DBObject):
1244 def __init__(self, id=None, row=None):
mbligh5244cbb2008-04-24 20:39:52 +00001245 fields = ['id', 'hostname', 'locked', 'synch_id','status',
1246 'invalid']
mblighd64e5702008-04-04 21:39:28 +00001247 super(Host, self).__init__(fields, id=id, row=row)
mblighe2586682008-02-29 22:45:46 +00001248
1249
1250 @classmethod
1251 def _get_table(cls):
1252 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001253
1254
1255 def current_task(self):
mbligh6f8bab42008-02-29 22:45:14 +00001256 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001257 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1258 """, (self.id,))
1259
mbligh6f8bab42008-02-29 22:45:14 +00001260 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001261 return None
1262 else:
mbligh6f8bab42008-02-29 22:45:14 +00001263 assert len(rows) == 1
1264 results = rows[0];
mbligh36768f02008-02-22 18:28:33 +00001265# print "current = %s" % results
1266 return HostQueueEntry(row=results)
1267
1268
1269 def next_queue_entries(self):
1270 if self.locked:
1271 print "%s locked, not queuing" % self.hostname
1272 return None
1273# print "%s/%s looking for work" % (self.hostname, self.platform_id)
mbligh6f8bab42008-02-29 22:45:14 +00001274 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001275 SELECT * FROM host_queue_entries
1276 WHERE ((host_id=%s) OR (meta_host IS NOT null AND
1277 (meta_host IN (
1278 SELECT label_id FROM hosts_labels WHERE host_id=%s
1279 )
1280 )
1281 AND job_id NOT IN (
1282 SELECT job_id FROM ineligible_host_queues
1283 WHERE host_id=%s
1284 )))
1285 AND NOT complete AND NOT active
1286 ORDER BY priority DESC, meta_host, id
1287 LIMIT 1
1288 """, (self.id,self.id, self.id))
1289
mbligh6f8bab42008-02-29 22:45:14 +00001290 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001291 return None
1292 else:
mbligh6f8bab42008-02-29 22:45:14 +00001293 return [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001294
1295 def yield_work(self):
1296 print "%s yielding work" % self.hostname
1297 if self.current_task():
1298 self.current_task().requeue()
1299
1300 def set_status(self,status):
mblighbb421852008-03-11 22:36:16 +00001301 print '%s -> %s' % (self.hostname, status)
mbligh36768f02008-02-22 18:28:33 +00001302 self.update_field('status',status)
1303
1304
1305class HostQueueEntry(DBObject):
1306 def __init__(self, id=None, row=None):
1307 assert id or row
1308 fields = ['id', 'job_id', 'host_id', 'priority', 'status',
1309 'meta_host', 'active', 'complete']
mblighd64e5702008-04-04 21:39:28 +00001310 super(HostQueueEntry, self).__init__(fields, id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001311 self.job = Job(self.job_id)
1312
1313 if self.host_id:
1314 self.host = Host(self.host_id)
1315 else:
1316 self.host = None
1317
1318 self.queue_log_path = os.path.join(self.job.results_dir(),
1319 'queue.log.' + str(self.id))
1320
1321
mblighe2586682008-02-29 22:45:46 +00001322 @classmethod
1323 def _get_table(cls):
1324 return 'host_queue_entries'
1325
1326
mbligh36768f02008-02-22 18:28:33 +00001327 def set_host(self, host):
1328 if host:
1329 self.queue_log_record('Assigning host ' + host.hostname)
1330 self.update_field('host_id', host.id)
1331 self.update_field('active', True)
mblighe2586682008-02-29 22:45:46 +00001332 self.block_host(host.id)
mbligh36768f02008-02-22 18:28:33 +00001333 else:
1334 self.queue_log_record('Releasing host')
mblighe2586682008-02-29 22:45:46 +00001335 self.unblock_host(self.host.id)
mbligh36768f02008-02-22 18:28:33 +00001336 self.update_field('host_id', None)
1337
1338 self.host = host
1339
1340
1341 def get_host(self):
mblighe2586682008-02-29 22:45:46 +00001342 return self.host
mbligh36768f02008-02-22 18:28:33 +00001343
1344
1345 def queue_log_record(self, log_line):
mblighe2586682008-02-29 22:45:46 +00001346 now = str(datetime.datetime.now())
mbligh36768f02008-02-22 18:28:33 +00001347 queue_log = open(self.queue_log_path, 'a', 0)
mblighe2586682008-02-29 22:45:46 +00001348 queue_log.write(now + ' ' + log_line + '\n')
mbligh36768f02008-02-22 18:28:33 +00001349 queue_log.close()
1350
1351
mblighe2586682008-02-29 22:45:46 +00001352 def block_host(self, host_id):
1353 print "creating block %s/%s" % (self.job.id, host_id)
1354 row = [0, self.job.id, host_id]
1355 block = IneligibleHostQueue(row=row, new_record=True)
1356 block.save()
1357
1358
1359 def unblock_host(self, host_id):
1360 print "removing block %s/%s" % (self.job.id, host_id)
1361 blocks = list(IneligibleHostQueue.fetch(
1362 'job_id=%d and host_id=%d' % (self.job.id, host_id)))
1363 assert len(blocks) == 1
1364 blocks[0].delete()
1365
1366
mbligh36768f02008-02-22 18:28:33 +00001367 def results_dir(self):
mblighe2586682008-02-29 22:45:46 +00001368 if self.job.is_synchronous() or self.job.num_machines() == 1:
1369 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001370 else:
1371 assert self.host
mblighe2586682008-02-29 22:45:46 +00001372 return os.path.join(self.job.job_dir,
1373 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +00001374
mblighe2586682008-02-29 22:45:46 +00001375
1376 def verify_results_dir(self):
1377 if self.job.is_synchronous() or self.job.num_machines() > 1:
1378 assert self.host
1379 return os.path.join(self.job.job_dir,
1380 self.host.hostname)
1381 else:
1382 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001383
1384
1385 def set_status(self, status):
1386 self.update_field('status', status)
1387 if self.host:
1388 hostname = self.host.hostname
1389 else:
1390 hostname = 'no host'
1391 print "%s/%d status -> %s" % (hostname, self.id, self.status)
1392 if status in ['Queued']:
1393 self.update_field('complete', False)
1394 self.update_field('active', False)
1395
mblighd5c95802008-03-05 00:33:46 +00001396 if status in ['Pending', 'Running', 'Verifying', 'Starting',
1397 'Abort', 'Aborting']:
mbligh36768f02008-02-22 18:28:33 +00001398 self.update_field('complete', False)
1399 self.update_field('active', True)
1400
mblighd5c95802008-03-05 00:33:46 +00001401 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
mbligh36768f02008-02-22 18:28:33 +00001402 self.update_field('complete', True)
1403 self.update_field('active', False)
1404
1405
1406 def run(self,assigned_host=None):
1407 if self.meta_host:
1408 assert assigned_host
mblighe2586682008-02-29 22:45:46 +00001409 # ensure results dir exists for the queue log
mbligh36768f02008-02-22 18:28:33 +00001410 self.job.create_results_dir()
1411 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001412
mbligh36768f02008-02-22 18:28:33 +00001413 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1414 self.meta_host, self.host.hostname, self.status)
1415
1416 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001417
mbligh36768f02008-02-22 18:28:33 +00001418 def requeue(self):
1419 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001420
mbligh36768f02008-02-22 18:28:33 +00001421 if self.meta_host:
1422 self.set_host(None)
1423
1424
mblighe2586682008-02-29 22:45:46 +00001425 def handle_host_failure(self):
1426 """\
1427 Called when this queue entry's host has failed verification and
1428 repair.
1429 """
mblighdffd6372008-02-29 22:47:33 +00001430 assert not self.meta_host
1431 self.set_status('Failed')
1432 if self.job.is_synchronous():
1433 self.job.stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00001434
1435
mbligh90a549d2008-03-25 23:52:34 +00001436 def clear_results_dir(self, results_dir=None, dont_delete_files=False):
mblighe2586682008-02-29 22:45:46 +00001437 results_dir = results_dir or self.results_dir()
1438 if not os.path.exists(results_dir):
1439 return
mbligh90a549d2008-03-25 23:52:34 +00001440 if dont_delete_files:
1441 temp_dir = tempfile.mkdtemp(suffix='.clear_results')
1442 print 'Moving results from %s to %s' % (results_dir,
1443 temp_dir)
mblighe2586682008-02-29 22:45:46 +00001444 for filename in os.listdir(results_dir):
mblighe2586682008-02-29 22:45:46 +00001445 path = os.path.join(results_dir, filename)
mbligh90a549d2008-03-25 23:52:34 +00001446 if dont_delete_files:
1447 shutil.move(path,
1448 os.path.join(temp_dir, filename))
1449 else:
1450 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00001451
1452
1453class Job(DBObject):
1454 def __init__(self, id=None, row=None):
1455 assert id or row
mblighd64e5702008-04-04 21:39:28 +00001456 super(Job, self).__init__(
mblighe2586682008-02-29 22:45:46 +00001457 ['id','owner','name','priority',
1458 'control_file','control_type','created_on',
1459 'synch_type', 'synch_count','synchronizing'],
1460 id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001461
mblighe2586682008-02-29 22:45:46 +00001462 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
1463 self.owner))
1464
1465
1466 @classmethod
1467 def _get_table(cls):
1468 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001469
1470
1471 def is_server_job(self):
1472 return self.control_type != 2
1473
1474
1475 def get_host_queue_entries(self):
mbligh6f8bab42008-02-29 22:45:14 +00001476 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001477 SELECT * FROM host_queue_entries
1478 WHERE job_id= %s
1479 """, (self.id,))
mbligh6f8bab42008-02-29 22:45:14 +00001480 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001481
1482 assert len(entries)>0
1483
1484 return entries
1485
1486
1487 def set_status(self, status, update_queues=False):
1488 self.update_field('status',status)
1489
1490 if update_queues:
1491 for queue_entry in self.get_host_queue_entries():
1492 queue_entry.set_status(status)
1493
1494
1495 def is_synchronous(self):
1496 return self.synch_type == 2
1497
1498
1499 def is_ready(self):
1500 if not self.is_synchronous():
1501 return True
1502 sql = "job_id=%s AND status='Pending'" % self.id
1503 count = self.count(sql, table='host_queue_entries')
1504 return (count == self.synch_count)
1505
1506
1507 def ready_to_synchronize(self):
1508 # heuristic
1509 queue_entries = self.get_host_queue_entries()
1510 count = 0
1511 for queue_entry in queue_entries:
1512 if queue_entry.status == 'Pending':
1513 count += 1
1514
1515 return (count/self.synch_count >= 0.5)
1516
1517
1518 def start_synchronizing(self):
1519 self.update_field('synchronizing', True)
1520
1521
1522 def results_dir(self):
1523 return self.job_dir
1524
1525 def num_machines(self, clause = None):
1526 sql = "job_id=%s" % self.id
1527 if clause:
1528 sql += " AND (%s)" % clause
1529 return self.count(sql, table='host_queue_entries')
1530
1531
1532 def num_queued(self):
1533 return self.num_machines('not complete')
1534
1535
1536 def num_active(self):
1537 return self.num_machines('active')
1538
1539
1540 def num_complete(self):
1541 return self.num_machines('complete')
1542
1543
1544 def is_finished(self):
1545 left = self.num_queued()
1546 print "%s: %s machines left" % (self.name, left)
1547 return left==0
1548
1549 def stop_synchronizing(self):
1550 self.update_field('synchronizing', False)
1551 self.set_status('Queued', update_queues = False)
1552
1553
mblighe2586682008-02-29 22:45:46 +00001554 def stop_all_entries(self):
1555 for child_entry in self.get_host_queue_entries():
1556 if not child_entry.complete:
1557 child_entry.set_status('Stopped')
1558
1559
1560 def write_to_machines_file(self, queue_entry):
1561 hostname = queue_entry.get_host().hostname
1562 print "writing %s to job %s machines file" % (hostname, self.id)
1563 file_path = os.path.join(self.job_dir, '.machines')
1564 mf = open(file_path, 'a')
1565 mf.write("%s\n" % queue_entry.get_host().hostname)
1566 mf.close()
mbligh36768f02008-02-22 18:28:33 +00001567
1568
1569 def create_results_dir(self, queue_entry=None):
1570 print "create: active: %s complete %s" % (self.num_active(),
1571 self.num_complete())
1572
1573 if not os.path.exists(self.job_dir):
1574 os.makedirs(self.job_dir)
1575
1576 if queue_entry:
1577 return queue_entry.results_dir()
1578 return self.job_dir
1579
1580
1581 def run(self, queue_entry):
1582 results_dir = self.create_results_dir(queue_entry)
1583
1584 if self.is_synchronous():
1585 if not self.is_ready():
mblighd5c95802008-03-05 00:33:46 +00001586 return Agent([VerifySynchronousTask(
1587 queue_entry = queue_entry)],
1588 [queue_entry.id])
mbligh36768f02008-02-22 18:28:33 +00001589
1590 queue_entry.set_status('Starting')
1591
1592 ctrl = open(os.tmpnam(), 'w')
1593 if self.control_file:
1594 ctrl.write(self.control_file)
1595 else:
1596 ctrl.write("")
1597 ctrl.flush()
1598
1599 if self.is_synchronous():
mbligh36768f02008-02-22 18:28:33 +00001600 queue_entries = self.get_host_queue_entries()
1601 else:
1602 assert queue_entry
mbligh36768f02008-02-22 18:28:33 +00001603 queue_entries = [queue_entry]
mblighe2586682008-02-29 22:45:46 +00001604 hostnames = ','.join([entry.get_host().hostname
1605 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00001606
mbligh6437ff52008-04-17 15:24:38 +00001607 # determine the job tag
1608 if self.is_synchronous() or self.num_machines() == 1:
1609 job_name = "%s-%s" % (self.id, self.owner)
1610 else:
1611 job_name = "%s-%s/%s" % (self.id, self.owner,
1612 hostnames)
1613
1614 params = [_autoserv_path, '-P', job_name, '-p', '-n',
mblighbb421852008-03-11 22:36:16 +00001615 '-r', os.path.abspath(results_dir),
1616 '-b', '-u', self.owner, '-l', self.name,
1617 '-m', hostnames, ctrl.name]
mbligh36768f02008-02-22 18:28:33 +00001618
1619 if not self.is_server_job():
1620 params.append('-c')
1621
1622 tasks = []
1623 if not self.is_synchronous():
1624 tasks.append(VerifyTask(queue_entry))
mblighe2586682008-02-29 22:45:46 +00001625
1626 tasks.append(QueueTask(job = self,
1627 queue_entries = queue_entries,
1628 cmd = params))
mbligh36768f02008-02-22 18:28:33 +00001629
mblighd5c95802008-03-05 00:33:46 +00001630 ids = []
1631 for entry in queue_entries:
1632 ids.append(entry.id)
1633
1634 agent = Agent(tasks, ids)
mbligh36768f02008-02-22 18:28:33 +00001635
1636 return agent
1637
1638
1639if __name__ == '__main__':
1640 main()