blob: 3e6b371cba61e7c088ca12238dbde354f78a1d3f [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
mblighbb421852008-03-11 22:36:16 +00009import optparse, signal, smtplib, socket, datetime, stat, pwd
mblighb090f142008-02-27 21:33:46 +000010from common import global_config
11
mbligh36768f02008-02-22 18:28:33 +000012RESULTS_DIR = '.'
13AUTOSERV_NICE_LEVEL = 10
14
15AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
16
17if os.environ.has_key('AUTOTEST_DIR'):
18 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
19AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
20AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
21
22if AUTOTEST_SERVER_DIR not in sys.path:
23 sys.path.insert(0, AUTOTEST_SERVER_DIR)
24
mblighbb421852008-03-11 22:36:16 +000025AUTOSERV_PID_FILE = '.autoserv_execute'
mbligh90a549d2008-03-25 23:52:34 +000026# how long to wait for autoserv to write a pidfile
27PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000028
mbligh6f8bab42008-02-29 22:45:14 +000029_db = None
mbligh36768f02008-02-22 18:28:33 +000030_shutdown = False
31_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000032_autoserv_path = 'autoserv'
33_testing_mode = False
mbligh36768f02008-02-22 18:28:33 +000034
35
36def main():
37 usage = 'usage: %prog [options] results_dir'
38
39 parser = optparse.OptionParser(usage)
mblighbb421852008-03-11 22:36:16 +000040 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
mbligh36768f02008-02-22 18:28:33 +000041 action='store_true')
42 parser.add_option('--logfile', help='Set a log file that all stdout ' +
43 'should be redirected to. Stderr will go to this ' +
44 'file + ".err"')
45 parser.add_option('--notify', help='Set an email address to be ' +
46 'notified of exceptions')
mbligh4314a712008-02-29 22:44:30 +000047 parser.add_option('--test', help='Indicate that scheduler is under ' +
48 'test and should use dummy autoserv and no parsing',
49 action='store_true')
mbligh36768f02008-02-22 18:28:33 +000050 (options, args) = parser.parse_args()
51 if len(args) != 1:
52 parser.print_usage()
53 return
54
55 global RESULTS_DIR
56 RESULTS_DIR = args[0]
57
58 global _notify_email
59 _notify_email = options.notify
mbligh4314a712008-02-29 22:44:30 +000060
61 if options.test:
62 global _autoserv_path
63 _autoserv_path = 'autoserv_dummy'
64 global _testing_mode
65 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000066
67 init(options.logfile)
mblighbb421852008-03-11 22:36:16 +000068 dispatcher = Dispatcher()
69 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
mbligh36768f02008-02-22 18:28:33 +000070
71 try:
72 while not _shutdown:
73 dispatcher.tick()
74 time.sleep(20)
mbligh36768f02008-02-22 18:28:33 +000075 except:
76 log_stacktrace("Uncaught exception; terminating monitor_db")
77
mbligh6f8bab42008-02-29 22:45:14 +000078 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +000079
80
81def handle_sigint(signum, frame):
82 global _shutdown
83 _shutdown = True
84 print "Shutdown request received."
85
86
87def init(logfile):
88 if logfile:
89 enable_logging(logfile)
90 print "%s> dispatcher starting" % time.strftime("%X %x")
91 print "My PID is %d" % os.getpid()
92
93 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
mbligh6f8bab42008-02-29 22:45:14 +000094 global _db
95 _db = DatabaseConn()
mbligh36768f02008-02-22 18:28:33 +000096
97 print "Setting signal handler"
98 signal.signal(signal.SIGINT, handle_sigint)
99
100 print "Connected! Running..."
101
102
103def enable_logging(logfile):
104 out_file = logfile
105 err_file = "%s.err" % logfile
106 print "Enabling logging to %s (%s)" % (out_file, err_file)
107 out_fd = open(out_file, "a", buffering=0)
108 err_fd = open(err_file, "a", buffering=0)
109
110 os.dup2(out_fd.fileno(), sys.stdout.fileno())
111 os.dup2(err_fd.fileno(), sys.stderr.fileno())
112
113 sys.stdout = out_fd
114 sys.stderr = err_fd
115
116
117def idle_hosts():
mbligh6f8bab42008-02-29 22:45:14 +0000118 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000119 SELECT * FROM hosts h WHERE
120 id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
121 (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
122 OR
123 (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
124 INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
125 )
126 AND locked=false AND (h.status IS null OR h.status='Ready') """)
mbligh6f8bab42008-02-29 22:45:14 +0000127 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000128 return hosts
129
mblighd5c95802008-03-05 00:33:46 +0000130def queue_entries_to_abort():
131 rows = _db.execute("""
132 SELECT * FROM host_queue_entries WHERE status='Abort';
133 """)
134 qe = [HostQueueEntry(row=i) for i in rows]
135 return qe
mbligh36768f02008-02-22 18:28:33 +0000136
mblighe2586682008-02-29 22:45:46 +0000137def remove_file_or_dir(path):
138 if stat.S_ISDIR(os.stat(path).st_mode):
139 # directory
140 shutil.rmtree(path)
141 else:
142 # file
143 os.remove(path)
144
145
mbligh6f8bab42008-02-29 22:45:14 +0000146class DatabaseConn:
147 def __init__(self):
148 self.reconnect_wait = 20
149 self.conn = None
150 self.cur = None
mbligh36768f02008-02-22 18:28:33 +0000151
mbligh4eb2df22008-03-13 15:39:29 +0000152 import MySQLdb.converters
153 self.convert_dict = MySQLdb.converters.conversions
154 self.convert_dict.setdefault(bool, self.convert_boolean)
155
mbligh6f8bab42008-02-29 22:45:14 +0000156 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000157
158
mbligh4eb2df22008-03-13 15:39:29 +0000159 @staticmethod
160 def convert_boolean(boolean, conversion_dict):
161 'Convert booleans to integer strings'
162 return str(int(boolean))
163
164
mbligh6f8bab42008-02-29 22:45:14 +0000165 def connect(self):
166 self.disconnect()
167
168 # get global config and parse for info
169 c = global_config.global_config
170 dbase = "AUTOTEST_WEB"
mbligh104e9ce2008-03-11 22:01:44 +0000171 DB_HOST = c.get_config_value(dbase, "host")
172 DB_SCHEMA = c.get_config_value(dbase, "database")
mbligh6f8bab42008-02-29 22:45:14 +0000173
174 global _testing_mode
175 if _testing_mode:
176 DB_SCHEMA = 'stresstest_autotest_web'
177
mbligh104e9ce2008-03-11 22:01:44 +0000178 DB_USER = c.get_config_value(dbase, "user")
179 DB_PASS = c.get_config_value(dbase, "password")
mbligh6f8bab42008-02-29 22:45:14 +0000180
181 while not self.conn:
182 try:
mbligh4eb2df22008-03-13 15:39:29 +0000183 self.conn = MySQLdb.connect(
184 host=DB_HOST, user=DB_USER, passwd=DB_PASS,
185 db=DB_SCHEMA, conv=self.convert_dict)
mbligh6f8bab42008-02-29 22:45:14 +0000186
187 self.conn.autocommit(True)
188 self.cur = self.conn.cursor()
189 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000190 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000191 print "Can't connect to MYSQL; reconnecting"
192 time.sleep(self.reconnect_wait)
193 self.disconnect()
194
195
196 def disconnect(self):
197 if self.conn:
198 self.conn.close()
199 self.conn = None
200 self.cur = None
201
202
203 def execute(self, *args, **dargs):
204 while (True):
205 try:
206 self.cur.execute(*args, **dargs)
207 return self.cur.fetchall()
208 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000209 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000210 print "MYSQL connection died; reconnecting"
211 time.sleep(self.reconnect_wait)
212 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000213
214
mblighdbdac6c2008-03-05 15:49:58 +0000215def generate_parse_command(results_dir, flags=""):
216 parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
217 output = os.path.abspath(os.path.join(results_dir, '.parse.log'))
218 cmd = "%s %s -r -o %s > %s 2>&1 &"
219 return cmd % (parse, flags, results_dir, output)
220
221
mbligh36768f02008-02-22 18:28:33 +0000222def parse_results(results_dir, flags=""):
mbligh4314a712008-02-29 22:44:30 +0000223 if _testing_mode:
224 return
mblighdbdac6c2008-03-05 15:49:58 +0000225 os.system(generate_parse_command(results_dir, flags))
mbligh36768f02008-02-22 18:28:33 +0000226
227
mblighbb421852008-03-11 22:36:16 +0000228def send_notify_email(subject, message):
229 if not _notify_email:
230 return
231
232 message = "%s / %s / %s\n%s" % (socket.gethostname(), os.getpid(),
233 time.strftime("%X %x"), message)
234 sender = pwd.getpwuid(os.getuid())[0] # see os.getlogin() online docs
235 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
236 sender, _notify_email, subject, message)
237 mailer = smtplib.SMTP('localhost')
238 mailer.sendmail(sender, _notify_email, msg)
239 mailer.quit()
240
241
mbligh36768f02008-02-22 18:28:33 +0000242def log_stacktrace(reason):
243 (type, value, tb) = sys.exc_info()
244 str = "EXCEPTION: %s\n" % reason
mbligh36768f02008-02-22 18:28:33 +0000245 str += ''.join(traceback.format_exception(type, value, tb))
246
247 sys.stderr.write("\n%s\n" % str)
mblighbb421852008-03-11 22:36:16 +0000248 send_notify_email("monitor_db exception", str)
mbligh36768f02008-02-22 18:28:33 +0000249
mblighbb421852008-03-11 22:36:16 +0000250
251def get_proc_poll_fn(pid):
252 proc_path = os.path.join('/proc', str(pid))
253 def poll_fn():
254 if os.path.exists(proc_path):
255 return None
256 return 0 # we can't get a real exit code
257 return poll_fn
258
259
260def kill_autoserv(pid, poll_fn=None):
261 print 'killing', pid
262 if poll_fn is None:
263 poll_fn = get_proc_poll_fn(pid)
264 if poll_fn() == None:
265 os.kill(pid, signal.SIGCONT)
266 os.kill(pid, signal.SIGTERM)
mbligh36768f02008-02-22 18:28:33 +0000267
268
269class Dispatcher:
mbligh90a549d2008-03-25 23:52:34 +0000270 autoserv_procs_cache = None
271
mblighbb421852008-03-11 22:36:16 +0000272 def __init__(self):
mbligh36768f02008-02-22 18:28:33 +0000273 self._agents = []
mbligh36768f02008-02-22 18:28:33 +0000274
mbligh36768f02008-02-22 18:28:33 +0000275
mblighbb421852008-03-11 22:36:16 +0000276 def do_initial_recovery(self, recover_hosts=True):
277 # always recover processes
278 self._recover_processes()
279
280 if recover_hosts:
281 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000282
283
284 def tick(self):
mbligh90a549d2008-03-25 23:52:34 +0000285 Dispatcher.autoserv_procs_cache = None
mblighbb421852008-03-11 22:36:16 +0000286 self._find_aborting()
287 self._find_more_work()
mbligh36768f02008-02-22 18:28:33 +0000288 self._handle_agents()
289
290
291 def add_agent(self, agent):
292 self._agents.append(agent)
293 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000294
295 # Find agent corresponding to the specified queue_entry
296 def get_agents(self, queue_entry):
297 res_agents = []
298 for agent in self._agents:
299 if queue_entry.id in agent.queue_entry_ids:
300 res_agents.append(agent)
301 return res_agents
302
303
304 def remove_agent(self, agent):
305 self._agents.remove(agent)
mbligh36768f02008-02-22 18:28:33 +0000306
307
mbligh90a549d2008-03-25 23:52:34 +0000308 @classmethod
309 def find_autoservs(cls, orphans_only=False):
mblighbb421852008-03-11 22:36:16 +0000310 """\
311 Returns a dict mapping pids to command lines for root autoserv
mbligh90a549d2008-03-25 23:52:34 +0000312 processes. If orphans_only=True, return only processes that
313 have been orphaned (i.e. parent pid = 1).
mblighbb421852008-03-11 22:36:16 +0000314 """
mbligh90a549d2008-03-25 23:52:34 +0000315 if cls.autoserv_procs_cache is not None:
316 return cls.autoserv_procs_cache
317
mblighbb421852008-03-11 22:36:16 +0000318 proc = subprocess.Popen(
mbligh90a549d2008-03-25 23:52:34 +0000319 ['/bin/ps', 'x', '-o', 'pid,pgid,ppid,comm,args'],
mblighbb421852008-03-11 22:36:16 +0000320 stdout=subprocess.PIPE)
321 # split each line into the four columns output by ps
mbligh90a549d2008-03-25 23:52:34 +0000322 procs = [line.split(None, 4) for line in
mblighbb421852008-03-11 22:36:16 +0000323 proc.communicate()[0].splitlines()]
mbligh90a549d2008-03-25 23:52:34 +0000324 autoserv_procs = {}
325 for proc in procs:
326 # check ppid == 1 for orphans
327 if orphans_only and proc[2] != 1:
328 continue
329 # only root autoserv processes have pgid == pid
330 if (proc[3] == 'autoserv' and # comm
331 proc[1] == proc[0]): # pgid == pid
332 # map pid to args
333 autoserv_procs[int(proc[0])] = proc[4]
334 cls.autoserv_procs_cache = autoserv_procs
335 return autoserv_procs
mblighbb421852008-03-11 22:36:16 +0000336
337
338 def recover_queue_entry(self, queue_entry, run_monitor):
339 job = queue_entry.job
340 if job.is_synchronous():
341 all_queue_entries = job.get_host_queue_entries()
342 else:
343 all_queue_entries = [queue_entry]
344 all_queue_entry_ids = [queue_entry.id for queue_entry
345 in all_queue_entries]
346 queue_task = RecoveryQueueTask(
347 job=queue_entry.job,
348 queue_entries=all_queue_entries,
349 run_monitor=run_monitor)
350 self.add_agent(Agent(tasks=[queue_task],
351 queue_entry_ids=all_queue_entry_ids))
352
353
354 def _recover_processes(self):
mbligh90a549d2008-03-25 23:52:34 +0000355 orphans = self.find_autoservs(orphans_only=True)
mblighbb421852008-03-11 22:36:16 +0000356
357 # first, recover running queue entries
358 rows = _db.execute("""SELECT * FROM host_queue_entries
359 WHERE status = 'Running'""")
360 queue_entries = [HostQueueEntry(row=i) for i in rows]
361 requeue_entries = []
362 recovered_entry_ids = set()
363 for queue_entry in queue_entries:
364 run_monitor = PidfileRunMonitor(
365 queue_entry.results_dir())
366 pid, exit_code = run_monitor.get_pidfile_info()
367 if pid is None:
368 # autoserv apparently never got run, so requeue
369 requeue_entries.append(queue_entry)
370 continue
371 if queue_entry.id in recovered_entry_ids:
372 # synchronous job we've already recovered
373 continue
374 print 'Recovering queue entry %d (pid %d)' % (
375 queue_entry.id, pid)
376 job = queue_entry.job
377 if job.is_synchronous():
378 for entry in job.get_host_queue_entries():
379 assert entry.active
380 recovered_entry_ids.add(entry.id)
381 self.recover_queue_entry(queue_entry,
382 run_monitor)
383 orphans.pop(pid, None)
384
385 # and requeue other active queue entries
386 rows = _db.execute("""SELECT * FROM host_queue_entries
387 WHERE active AND NOT complete
388 AND status != 'Running'
389 AND status != 'Pending'
390 AND status != 'Abort'
391 AND status != 'Aborting'""")
392 queue_entries = [HostQueueEntry(row=i) for i in rows]
393 for queue_entry in queue_entries + requeue_entries:
394 print 'Requeuing running QE %d' % queue_entry.id
mbligh90a549d2008-03-25 23:52:34 +0000395 queue_entry.clear_results_dir(dont_delete_files=True)
mblighbb421852008-03-11 22:36:16 +0000396 queue_entry.requeue()
397
398
399 # now kill any remaining autoserv processes
400 for pid in orphans.keys():
401 print 'Killing orphan %d (%s)' % (pid, orphans[pid])
402 kill_autoserv(pid)
403
404 # recover aborting tasks
mbligh90a549d2008-03-25 23:52:34 +0000405 rebooting_host_ids = set()
mblighd5c95802008-03-05 00:33:46 +0000406 rows = _db.execute("""SELECT * FROM host_queue_entries
407 WHERE status='Abort' or status='Aborting'""")
mblighbb421852008-03-11 22:36:16 +0000408 queue_entries = [HostQueueEntry(row=i) for i in rows]
409 for queue_entry in queue_entries:
mbligh90a549d2008-03-25 23:52:34 +0000410 print 'Recovering aborting QE %d' % queue_entry.id
mblighbb421852008-03-11 22:36:16 +0000411 queue_host = queue_entry.get_host()
412 reboot_task = RebootTask(queue_host)
413 verify_task = VerifyTask(host = queue_host)
414 self.add_agent(Agent(tasks=[reboot_task,
415 verify_task],
416 queue_entry_ids=[queue_entry.id]))
417 queue_entry.set_status('Aborted')
418 # Secure the host from being picked up
419 queue_host.set_status('Rebooting')
mbligh90a549d2008-03-25 23:52:34 +0000420 rebooting_host_ids.add(queue_host.id)
mblighd5c95802008-03-05 00:33:46 +0000421
mblighbb421852008-03-11 22:36:16 +0000422 # reverify hosts that were in the middle of verify, repair or
423 # reboot
mbligh90a549d2008-03-25 23:52:34 +0000424 self._reverify_hosts_where("""(status = 'Repairing' OR
425 status = 'Verifying' OR
426 status = 'Rebooting')""",
427 exclude_ids=rebooting_host_ids)
428
429 # finally, recover "Running" hosts with no active queue entries,
430 # although this should never happen
431 message = ('Recovering running host %s - this probably '
432 'indicates a scheduler bug')
433 self._reverify_hosts_where("""status = 'Running' AND
434 id NOT IN (SELECT host_id
435 FROM host_queue_entries
436 WHERE active)""",
437 print_message=message)
438
439
440 def _reverify_hosts_where(self, where,
441 print_message='Reverifying host %s',
442 exclude_ids=set()):
443 rows = _db.execute('SELECT * FROM hosts WHERE locked = 0 AND ' +
444 where)
mblighbb421852008-03-11 22:36:16 +0000445 hosts = [Host(row=i) for i in rows]
446 for host in hosts:
mbligh90a549d2008-03-25 23:52:34 +0000447 if host.id in exclude_ids:
448 continue
449 if print_message is not None:
450 print print_message % host.hostname
451 verify_task = VerifyTask(host = host)
452 self.add_agent(Agent(tasks = [verify_task]))
mblighbb421852008-03-11 22:36:16 +0000453
454
455 def _recover_hosts(self):
mbligh90a549d2008-03-25 23:52:34 +0000456 # recover "Repair Failed" hosts
457 message = 'Reverifying dead host %s'
458 self._reverify_hosts_where("status = 'Repair Failed'",
459 print_message=message)
mbligh36768f02008-02-22 18:28:33 +0000460
461
462 def _find_more_work(self):
463 print "finding work"
464
465 num_started = 0
466 for host in idle_hosts():
467 tasks = host.next_queue_entries()
468 if tasks:
469 for next in tasks:
470 try:
471 agent = next.run(assigned_host=host)
472 if agent:
473 self.add_agent(agent)
474
475 num_started += 1
476 if num_started>=100:
477 return
478 break
479 except:
480 next.set_status('Failed')
481
482# if next.host:
483# next.host.set_status('Ready')
484
485 log_stacktrace("task_id = %d" % next.id)
486
487
mblighd5c95802008-03-05 00:33:46 +0000488 def _find_aborting(self):
489 num_aborted = 0
490 # Find jobs that are aborting
491 for entry in queue_entries_to_abort():
492 agents_to_abort = self.get_agents(entry)
493 entry_host = entry.get_host()
494 reboot_task = RebootTask(entry_host)
495 verify_task = VerifyTask(host = entry_host)
496 tasks = [reboot_task, verify_task]
497 if agents_to_abort:
498 abort_task = AbortTask(entry, agents_to_abort)
499 tasks.insert(0, abort_task)
500 else:
501 entry.set_status('Aborted')
502 # just to make sure this host does not get
503 # taken away
504 entry_host.set_status('Rebooting')
505 self.add_agent(Agent(tasks=tasks,
506 queue_entry_ids = [entry.id]))
507 num_aborted += 1
508 if num_aborted >= 50:
509 break
510
511
mbligh36768f02008-02-22 18:28:33 +0000512 def _handle_agents(self):
513 still_running = []
514 for agent in self._agents:
515 agent.tick()
516 if not agent.is_done():
517 still_running.append(agent)
518 else:
519 print "agent finished"
520 self._agents = still_running
521
522
523class RunMonitor(object):
524 def __init__(self, cmd, nice_level = None, log_file = None):
525 self.nice_level = nice_level
526 self.log_file = log_file
527 self.proc = self.run(cmd)
528
529 def run(self, cmd):
530 if self.nice_level:
531 nice_cmd = ['nice','-n', str(self.nice_level)]
532 nice_cmd.extend(cmd)
533 cmd = nice_cmd
534
535 out_file = None
536 if self.log_file:
537 try:
mblighbb421852008-03-11 22:36:16 +0000538 os.makedirs(os.path.dirname(self.log_file))
mbligh36768f02008-02-22 18:28:33 +0000539 out_file = open(self.log_file, 'a')
540 out_file.write("\n%s\n" % ('*'*80))
541 out_file.write("%s> %s\n" % (time.strftime("%X %x"), cmd))
542 out_file.write("%s\n" % ('*'*80))
543 except:
544 pass
545
546 if not out_file:
547 out_file = open('/dev/null', 'w')
548
549 in_devnull = open('/dev/null', 'r')
550 print "cmd = %s" % cmd
551 print "path = %s" % os.getcwd()
552
553 proc = subprocess.Popen(cmd, stdout=out_file,
554 stderr=subprocess.STDOUT, stdin=in_devnull)
555 out_file.close()
556 in_devnull.close()
557 return proc
558
559
mblighbb421852008-03-11 22:36:16 +0000560 def get_pid(self):
561 return self.proc.pid
562
563
mbligh36768f02008-02-22 18:28:33 +0000564 def kill(self):
mblighbb421852008-03-11 22:36:16 +0000565 kill_autoserv(self.get_pid(), self.exit_code)
566
mbligh36768f02008-02-22 18:28:33 +0000567
568 def exit_code(self):
569 return self.proc.poll()
570
571
mblighbb421852008-03-11 22:36:16 +0000572class PidfileException(Exception):
573 """\
574 Raised when there's some unexpected behavior with the pid file.
575 """
576
577
578class PidfileRunMonitor(RunMonitor):
579 def __init__(self, results_dir, cmd=None, nice_level=None,
580 log_file=None):
581 self.results_dir = os.path.abspath(results_dir)
582 self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
583 self.lost_process = False
mbligh90a549d2008-03-25 23:52:34 +0000584 self.start_time = time.time()
mblighbb421852008-03-11 22:36:16 +0000585 if cmd is None:
586 # we're reattaching to an existing pid, so don't call
587 # the superconstructor (we don't want to kick off a new
588 # process)
589 pass
590 else:
mblighd64e5702008-04-04 21:39:28 +0000591 super(PidfileRunMonitor, self).__init__(cmd,
592 nice_level, log_file)
mblighbb421852008-03-11 22:36:16 +0000593
594
595 def get_pid(self):
596 pid, exit_status = self.get_pidfile_info()
597 assert pid is not None
598 return pid
599
600
mbligh90a549d2008-03-25 23:52:34 +0000601 def _check_command_line(self, command_line, spacer=' ',
602 print_error=False):
603 results_dir_arg = spacer.join(('', '-r', self.results_dir, ''))
604 match = results_dir_arg in command_line
605 if print_error and not match:
606 print '%s not found in %s' % (repr(results_dir_arg),
607 repr(command_line))
608 return match
609
610
611 def _check_proc_fs(self, pid):
mblighbb421852008-03-11 22:36:16 +0000612 cmdline_path = os.path.join('/proc', str(pid), 'cmdline')
613 try:
614 cmdline_file = open(cmdline_path, 'r')
615 cmdline = cmdline_file.read().strip()
616 cmdline_file.close()
617 except IOError:
618 return False
619 # /proc/.../cmdline has \x00 separating args
mbligh90a549d2008-03-25 23:52:34 +0000620 return self._check_command_line(cmdline, spacer='\x00',
621 print_error=True)
mblighbb421852008-03-11 22:36:16 +0000622
623
624 def read_pidfile(self):
625 if not os.path.exists(self.pid_file):
626 return None, None
627 file_obj = open(self.pid_file, 'r')
628 lines = file_obj.readlines()
629 file_obj.close()
630 assert 1 <= len(lines) <= 2
631 try:
632 pid = int(lines[0])
633 exit_status = None
634 if len(lines) == 2:
635 exit_status = int(lines[1])
636 except ValueError, exc:
637 raise Exception('Corrupt pid file: ' + str(exc.args))
638
639 return pid, exit_status
640
641
mbligh90a549d2008-03-25 23:52:34 +0000642 def _find_autoserv_proc(self):
643 autoserv_procs = Dispatcher.find_autoservs()
644 for pid, args in autoserv_procs.iteritems():
645 if self._check_command_line(args):
646 return pid, args
647 return None, None
648
649
mblighbb421852008-03-11 22:36:16 +0000650 def get_pidfile_info(self):
651 """\
652 Returns:
653 None, None if autoserv has not yet run
654 pid, None if autoserv is running
655 pid, exit_status if autoserv has completed
656 """
657 if self.lost_process:
658 return self.pid, self.exit_status
659
660 pid, exit_status = self.read_pidfile()
661
mbligh90a549d2008-03-25 23:52:34 +0000662 if pid is None:
663 return self._handle_no_pid()
664
665 if exit_status is None:
666 # double check whether or not autoserv is running
667 proc_running = self._check_proc_fs(pid)
668 if proc_running:
669 return pid, exit_status
670
671 # pid but no process - maybe process *just* exited
mblighbb421852008-03-11 22:36:16 +0000672 pid, exit_status = self.read_pidfile()
mbligh90a549d2008-03-25 23:52:34 +0000673 if exit_status is None:
mblighbb421852008-03-11 22:36:16 +0000674 # autoserv exited without writing an exit code
675 # to the pidfile
676 error = ('autoserv died without writing exit '
677 'code')
678 message = error + '\nPid: %s\nPidfile: %s' % (
679 pid, self.pid_file)
680 print message
681 send_notify_email(error, message)
mbligh90a549d2008-03-25 23:52:34 +0000682 self.on_lost_process(pid)
mblighbb421852008-03-11 22:36:16 +0000683 return self.pid, self.exit_status
684
685 return pid, exit_status
686
687
mbligh90a549d2008-03-25 23:52:34 +0000688 def _handle_no_pid(self):
689 """\
690 Called when no pidfile is found or no pid is in the pidfile.
691 """
692 # is autoserv running?
693 pid, args = self._find_autoserv_proc()
694 if pid is None:
695 # no autoserv process running
696 message = 'No pid found at ' + self.pid_file
697 else:
698 message = ("Process %d (%s) hasn't written pidfile %s" %
699 (pid, args, self.pid_file))
700
701 print message
702 if time.time() - self.start_time > PIDFILE_TIMEOUT:
703 send_notify_email('Process has failed to write pidfile',
704 message)
705 if pid is not None:
706 kill_autoserv(pid)
707 else:
708 pid = 0
709 self.on_lost_process(pid)
710 return self.pid, self.exit_status
711
712 return None, None
713
714
715 def on_lost_process(self, pid):
716 """\
717 Called when autoserv has exited without writing an exit status,
718 or we've timed out waiting for autoserv to write a pid to the
719 pidfile. In either case, we just return failure and the caller
720 should signal some kind of warning.
721
722 pid is unimportant here, as it shouldn't be used by anyone.
723 """
724 self.lost_process = True
725 self.pid = pid
726 self.exit_status = 1
727
728
mblighbb421852008-03-11 22:36:16 +0000729 def exit_code(self):
730 pid, exit_code = self.get_pidfile_info()
mblighbb421852008-03-11 22:36:16 +0000731 return exit_code
732
733
mbligh36768f02008-02-22 18:28:33 +0000734class Agent(object):
mblighd5c95802008-03-05 00:33:46 +0000735 def __init__(self, tasks, queue_entry_ids=[]):
mbligh36768f02008-02-22 18:28:33 +0000736 self.active_task = None
737 self.queue = Queue.Queue(0)
738 self.dispatcher = None
mblighd5c95802008-03-05 00:33:46 +0000739 self.queue_entry_ids = queue_entry_ids
mbligh36768f02008-02-22 18:28:33 +0000740
741 for task in tasks:
742 self.add_task(task)
743
744
745 def add_task(self, task):
746 self.queue.put_nowait(task)
747 task.agent = self
748
749
750 def tick(self):
751 print "agent tick"
752 if self.active_task and not self.active_task.is_done():
753 self.active_task.poll()
754 else:
755 self._next_task();
756
757
758 def _next_task(self):
759 print "agent picking task"
760 if self.active_task:
761 assert self.active_task.is_done()
762
mblighe2586682008-02-29 22:45:46 +0000763 if not self.active_task.success:
764 self.on_task_failure()
765
mbligh36768f02008-02-22 18:28:33 +0000766 self.active_task = None
767 if not self.is_done():
768 self.active_task = self.queue.get_nowait()
769 if self.active_task:
770 self.active_task.start()
771
772
mblighe2586682008-02-29 22:45:46 +0000773 def on_task_failure(self):
mblighe2586682008-02-29 22:45:46 +0000774 self.queue = Queue.Queue(0)
775 for task in self.active_task.failure_tasks:
776 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +0000777
mblighe2586682008-02-29 22:45:46 +0000778
mbligh36768f02008-02-22 18:28:33 +0000779 def is_done(self):
780 return self.active_task == None and self.queue.empty()
781
782
783 def start(self):
784 assert self.dispatcher
785
786 self._next_task()
787
mblighd5c95802008-03-05 00:33:46 +0000788
mbligh36768f02008-02-22 18:28:33 +0000789class AgentTask(object):
mbligh16c722d2008-03-05 00:58:44 +0000790 def __init__(self, cmd, failure_tasks = []):
mbligh36768f02008-02-22 18:28:33 +0000791 self.done = False
792 self.failure_tasks = failure_tasks
793 self.started = False
794 self.cmd = cmd
mblighd5c95802008-03-05 00:33:46 +0000795 self.task = None
mbligh36768f02008-02-22 18:28:33 +0000796 self.agent = None
mblighd5c95802008-03-05 00:33:46 +0000797 self.monitor = None
mblighd64e5702008-04-04 21:39:28 +0000798 self.success = None
mbligh36768f02008-02-22 18:28:33 +0000799
800
801 def poll(self):
802 print "poll"
mblighd5c95802008-03-05 00:33:46 +0000803 if self.monitor:
mbligh36768f02008-02-22 18:28:33 +0000804 self.tick(self.monitor.exit_code())
805 else:
806 self.finished(False)
807
808
809 def tick(self, exit_code):
810 if exit_code==None:
811 return
812# print "exit_code was %d" % exit_code
813 if exit_code == 0:
814 success = True
815 else:
816 success = False
817
818 self.finished(success)
819
820
821 def is_done(self):
822 return self.done
823
824
825 def finished(self, success):
826 self.done = True
827 self.success = success
828 self.epilog()
829
830
831 def prolog(self):
832 pass
833
mblighd64e5702008-04-04 21:39:28 +0000834
835 def create_temp_resultsdir(self, suffix=''):
836 self.temp_results_dir = tempfile.mkdtemp(suffix=suffix)
837
838
839 def cleanup(self):
840 if (hasattr(self, 'temp_results_dir') and
841 os.path.exists(self.temp_results_dir)):
842 shutil.rmtree(self.temp_results_dir)
843
mbligh36768f02008-02-22 18:28:33 +0000844
845 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +0000846 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +0000847
848
849 def start(self):
850 assert self.agent
851
852 if not self.started:
853 self.prolog()
854 self.run()
855
856 self.started = True
857
mblighd64e5702008-04-04 21:39:28 +0000858
mbligh36768f02008-02-22 18:28:33 +0000859 def abort(self):
mblighd5c95802008-03-05 00:33:46 +0000860 if self.monitor:
861 self.monitor.kill()
862 self.done = True
mblighd64e5702008-04-04 21:39:28 +0000863 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +0000864
865
866 def run(self):
867 if self.cmd:
868 print "agent starting monitor"
mbligh36768f02008-02-22 18:28:33 +0000869 log_file = None
870 if hasattr(self, 'host'):
mblighbb421852008-03-11 22:36:16 +0000871 log_file = os.path.join(RESULTS_DIR, 'hosts',
872 self.host.hostname)
873 self.monitor = RunMonitor(
874 self.cmd, nice_level = AUTOSERV_NICE_LEVEL,
875 log_file = log_file)
mbligh36768f02008-02-22 18:28:33 +0000876
877
878class RepairTask(AgentTask):
mbligh16c722d2008-03-05 00:58:44 +0000879 def __init__(self, host, fail_queue_entry=None):
880 """\
881 fail_queue_entry: queue entry to mark failed if this repair
882 fails.
883 """
mblighd64e5702008-04-04 21:39:28 +0000884 self.create_temp_resultsdir('.repair')
885 cmd = [_autoserv_path , '-R', '-m', host.hostname,
886 '-r', self.temp_results_dir]
mbligh36768f02008-02-22 18:28:33 +0000887 self.host = host
mbligh16c722d2008-03-05 00:58:44 +0000888 self.fail_queue_entry = fail_queue_entry
mblighd64e5702008-04-04 21:39:28 +0000889 super(RepairTask, self).__init__(cmd)
mblighe2586682008-02-29 22:45:46 +0000890
mbligh36768f02008-02-22 18:28:33 +0000891
892 def prolog(self):
893 print "repair_task starting"
894 self.host.set_status('Repairing')
895
896
897 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +0000898 super(RepairTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +0000899 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000900 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +0000901 else:
mbligh16c722d2008-03-05 00:58:44 +0000902 self.host.set_status('Repair Failed')
903 if self.fail_queue_entry:
904 self.fail_queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +0000905
906
907class VerifyTask(AgentTask):
908 def __init__(self, queue_entry=None, host=None):
909 assert bool(queue_entry) != bool(host)
910
911 self.host = host or queue_entry.host
912 self.queue_entry = queue_entry
913
mblighd64e5702008-04-04 21:39:28 +0000914 self.create_temp_resultsdir('.verify')
mbligh48c10a52008-02-29 22:46:38 +0000915 cmd = [_autoserv_path,'-v','-m',self.host.hostname,
mbligh36768f02008-02-22 18:28:33 +0000916 '-r', self.temp_results_dir]
917
mbligh16c722d2008-03-05 00:58:44 +0000918 fail_queue_entry = None
919 if queue_entry and not queue_entry.meta_host:
920 fail_queue_entry = queue_entry
921 failure_tasks = [RepairTask(self.host, fail_queue_entry)]
mblighe2586682008-02-29 22:45:46 +0000922
mblighd64e5702008-04-04 21:39:28 +0000923 super(VerifyTask, self).__init__(cmd,
924 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +0000925
926
mbligh36768f02008-02-22 18:28:33 +0000927 def prolog(self):
928 print "starting verify on %s" % (self.host.hostname)
929 if self.queue_entry:
930 self.queue_entry.set_status('Verifying')
mblighdffd6372008-02-29 22:47:33 +0000931 self.queue_entry.clear_results_dir(
932 self.queue_entry.verify_results_dir())
mbligh36768f02008-02-22 18:28:33 +0000933 self.host.set_status('Verifying')
934
935
mblighd64e5702008-04-04 21:39:28 +0000936 def cleanup(self):
937 if not os.path.exists(self.temp_results_dir):
938 return
mbligh36768f02008-02-22 18:28:33 +0000939 if self.queue_entry and (self.success or
mblighd64e5702008-04-04 21:39:28 +0000940 not self.queue_entry.meta_host):
mbligh36768f02008-02-22 18:28:33 +0000941 self.move_results()
mblighd64e5702008-04-04 21:39:28 +0000942 super(VerifyTask, self).cleanup()
943
944
945 def epilog(self):
946 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +0000947
948 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000949 self.host.set_status('Ready')
950 elif self.queue_entry:
mblighdffd6372008-02-29 22:47:33 +0000951 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +0000952
953
954 def move_results(self):
955 assert self.queue_entry is not None
mblighe2586682008-02-29 22:45:46 +0000956 target_dir = self.queue_entry.verify_results_dir()
mbligh36768f02008-02-22 18:28:33 +0000957 if not os.path.exists(target_dir):
958 os.makedirs(target_dir)
959 files = os.listdir(self.temp_results_dir)
960 for filename in files:
mblighbb421852008-03-11 22:36:16 +0000961 if filename == AUTOSERV_PID_FILE:
962 continue
mblighe2586682008-02-29 22:45:46 +0000963 self.force_move(os.path.join(self.temp_results_dir,
964 filename),
965 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +0000966
967
mblighe2586682008-02-29 22:45:46 +0000968 @staticmethod
969 def force_move(source, dest):
970 """\
971 Replacement for shutil.move() that will delete the destination
972 if it exists, even if it's a directory.
973 """
974 if os.path.exists(dest):
975 print ('Warning: removing existing destination file ' +
976 dest)
977 remove_file_or_dir(dest)
978 shutil.move(source, dest)
979
980
mblighdffd6372008-02-29 22:47:33 +0000981class VerifySynchronousTask(VerifyTask):
982 def __init__(self, queue_entry):
mblighd64e5702008-04-04 21:39:28 +0000983 super(VerifySynchronousTask, self).__init__(
984 queue_entry = queue_entry)
mblighdffd6372008-02-29 22:47:33 +0000985
986
mbligh16c722d2008-03-05 00:58:44 +0000987 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +0000988 super(VerifySynchronousTask, self).epilog()
mbligh16c722d2008-03-05 00:58:44 +0000989 if self.success:
990 if self.queue_entry.job.num_complete() > 0:
991 # some other entry failed verify, and we've
992 # already been marked as stopped
993 return
mblighdffd6372008-02-29 22:47:33 +0000994
mbligh16c722d2008-03-05 00:58:44 +0000995 self.queue_entry.set_status('Pending')
996 job = self.queue_entry.job
997 if job.is_ready():
998 agent = job.run(self.queue_entry)
999 self.agent.dispatcher.add_agent(agent)
mblighe2586682008-02-29 22:45:46 +00001000
mbligh36768f02008-02-22 18:28:33 +00001001class QueueTask(AgentTask):
1002 def __init__(self, job, queue_entries, cmd):
mblighd64e5702008-04-04 21:39:28 +00001003 super(QueueTask, self).__init__(cmd)
mbligh36768f02008-02-22 18:28:33 +00001004 self.job = job
1005 self.queue_entries = queue_entries
1006
1007
mblighbb421852008-03-11 22:36:16 +00001008 @staticmethod
1009 def _write_keyval(results_dir, field, value):
1010 key_path = os.path.join(results_dir, 'keyval')
mbligh36768f02008-02-22 18:28:33 +00001011 keyval_file = open(key_path, 'a')
1012 print >> keyval_file, '%s=%d' % (field, value)
1013 keyval_file.close()
1014
1015
mblighbb421852008-03-11 22:36:16 +00001016 def results_dir(self):
1017 return self.queue_entries[0].results_dir()
1018
1019
1020 def run(self):
1021 """\
1022 Override AgentTask.run() so we can use a PidfileRunMonitor.
1023 """
1024 self.monitor = PidfileRunMonitor(self.results_dir(),
1025 cmd=self.cmd,
1026 nice_level=AUTOSERV_NICE_LEVEL)
1027
1028
mbligh36768f02008-02-22 18:28:33 +00001029 def prolog(self):
mblighdbdac6c2008-03-05 15:49:58 +00001030 # write the parser commands into the results directories
1031 if self.job.is_synchronous() or self.job.num_machines()==1:
1032 results_dir = self.job.results_dir()
1033 cmdfile = os.path.join(results_dir, '.parse.cmd')
mblighb03ba642008-03-13 17:37:17 +00001034 cmd = generate_parse_command(results_dir, '-n')
mblighdbdac6c2008-03-05 15:49:58 +00001035 print >> open(cmdfile, 'w'), cmd
1036 else:
1037 for queue_entry in self.queue_entries:
1038 results_dir = queue_entry.results_dir()
1039 cmdfile = os.path.join(results_dir,
1040 '.parse.cmd')
1041 cmd = generate_parse_command(results_dir,
mblighb03ba642008-03-13 17:37:17 +00001042 '-l 2 -n')
mblighdbdac6c2008-03-05 15:49:58 +00001043 print >> open(cmdfile, 'w'), cmd
1044
mblighe2586682008-02-29 22:45:46 +00001045 # write some job timestamps into the job keyval file
1046 queued = time.mktime(self.job.created_on.timetuple())
1047 started = time.time()
mblighbb421852008-03-11 22:36:16 +00001048 self._write_keyval(self.results_dir(), "job_queued", queued)
1049 self._write_keyval(self.results_dir(), "job_started", started)
mbligh36768f02008-02-22 18:28:33 +00001050 for queue_entry in self.queue_entries:
1051 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
1052 queue_entry.set_status('Running')
1053 queue_entry.host.set_status('Running')
mblighe2586682008-02-29 22:45:46 +00001054 if (not self.job.is_synchronous() and
1055 self.job.num_machines() > 1):
1056 assert len(self.queue_entries) == 1
1057 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001058
1059
1060 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001061 super(QueueTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001062 if self.success:
1063 status = 'Completed'
1064 else:
1065 status = 'Failed'
1066
mblighe2586682008-02-29 22:45:46 +00001067 # write another timestamp into the job keyval file
1068 finished = time.time()
mblighbb421852008-03-11 22:36:16 +00001069 self._write_keyval(self.results_dir(), "job_finished", finished)
mbligh36768f02008-02-22 18:28:33 +00001070 for queue_entry in self.queue_entries:
1071 queue_entry.set_status(status)
1072 queue_entry.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001073
1074 if self.job.is_synchronous() or self.job.num_machines()==1:
1075 if self.job.is_finished():
1076 parse_results(self.job.results_dir())
1077 else:
1078 for queue_entry in self.queue_entries:
mblighbb421852008-03-11 22:36:16 +00001079 parse_results(queue_entry.results_dir(),
1080 flags='-l 2')
1081
mbligh36768f02008-02-22 18:28:33 +00001082 print "queue_task finished with %s/%s" % (status, self.success)
1083
1084
mblighbb421852008-03-11 22:36:16 +00001085class RecoveryQueueTask(QueueTask):
1086 def __init__(self, job, queue_entries, run_monitor):
mblighd64e5702008-04-04 21:39:28 +00001087 super(RecoveryQueueTask, self).__init__(job,
1088 queue_entries, cmd=None)
mblighbb421852008-03-11 22:36:16 +00001089 self.run_monitor = run_monitor
1090
1091
1092 def run(self):
1093 self.monitor = self.run_monitor
1094
1095
1096 def prolog(self):
1097 # recovering an existing process - don't do prolog
1098 pass
1099
1100
mbligh36768f02008-02-22 18:28:33 +00001101class RebootTask(AgentTask):
mblighd5c95802008-03-05 00:33:46 +00001102 def __init__(self, host):
1103 global _autoserv_path
1104
1105 # Current implementation of autoserv requires control file
1106 # to be passed on reboot action request. TODO: remove when no
1107 # longer appropriate.
mblighd64e5702008-04-04 21:39:28 +00001108 self.create_temp_resultsdir('.reboot')
mblighd5c95802008-03-05 00:33:46 +00001109 self.cmd = [_autoserv_path, '-b', '-m', host.hostname,
mblighd64e5702008-04-04 21:39:28 +00001110 '-r', self.temp_results_dir, '/dev/null']
mbligh36768f02008-02-22 18:28:33 +00001111 self.host = host
mblighd64e5702008-04-04 21:39:28 +00001112 super(RebootTask, self).__init__(self.cmd,
mbligh16c722d2008-03-05 00:58:44 +00001113 failure_tasks=[RepairTask(host)])
1114
mblighd5c95802008-03-05 00:33:46 +00001115
1116 def prolog(self):
1117 print "starting reboot task for host: %s" % self.host.hostname
1118 self.host.set_status("Rebooting")
1119
mblighd5c95802008-03-05 00:33:46 +00001120
1121class AbortTask(AgentTask):
1122 def __init__(self, queue_entry, agents_to_abort):
1123 self.queue_entry = queue_entry
1124 self.agents_to_abort = agents_to_abort
1125 for agent in agents_to_abort:
1126 agent.dispatcher.remove_agent(agent)
mblighd64e5702008-04-04 21:39:28 +00001127 super(AbortTask, self).__init__('')
mbligh36768f02008-02-22 18:28:33 +00001128
1129
mblighd5c95802008-03-05 00:33:46 +00001130 def prolog(self):
1131 print "starting abort on host %s, job %s" % (
1132 self.queue_entry.host_id, self.queue_entry.job_id)
1133 self.queue_entry.set_status('Aborting')
1134
mbligh36768f02008-02-22 18:28:33 +00001135
mblighd5c95802008-03-05 00:33:46 +00001136 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001137 super(AbortTask, self).epilog()
mblighd5c95802008-03-05 00:33:46 +00001138 self.queue_entry.set_status('Aborted')
1139 self.success = True
mbligh36768f02008-02-22 18:28:33 +00001140
mblighd64e5702008-04-04 21:39:28 +00001141
mbligh36768f02008-02-22 18:28:33 +00001142 def run(self):
mblighd5c95802008-03-05 00:33:46 +00001143 for agent in self.agents_to_abort:
1144 if (agent.active_task):
1145 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001146
1147
1148class DBObject(object):
mblighe2586682008-02-29 22:45:46 +00001149 def __init__(self, fields, id=None, row=None, new_record=False):
1150 assert (bool(id) != bool(row)) and fields
mbligh36768f02008-02-22 18:28:33 +00001151
mblighe2586682008-02-29 22:45:46 +00001152 self.__table = self._get_table()
mbligh36768f02008-02-22 18:28:33 +00001153 self.__fields = fields
1154
1155 self.__new_record = new_record
1156
1157 if row is None:
1158 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
mbligh6f8bab42008-02-29 22:45:14 +00001159 rows = _db.execute(sql, (id,))
1160 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001161 raise "row not found (table=%s, id=%s)" % \
1162 (self.__table, id)
mbligh6f8bab42008-02-29 22:45:14 +00001163 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001164
mblighe2586682008-02-29 22:45:46 +00001165 assert len(row)==len(fields), (
1166 "table = %s, row = %s/%d, fields = %s/%d" % (
1167 self.__table, row, len(row), fields, len(fields)))
mbligh36768f02008-02-22 18:28:33 +00001168
1169 self.__valid_fields = {}
1170 for i,value in enumerate(row):
1171 self.__dict__[fields[i]] = value
1172 self.__valid_fields[fields[i]] = True
1173
1174 del self.__valid_fields['id']
1175
mblighe2586682008-02-29 22:45:46 +00001176
1177 @classmethod
1178 def _get_table(cls):
1179 raise NotImplementedError('Subclasses must override this')
1180
1181
mbligh36768f02008-02-22 18:28:33 +00001182 def count(self, where, table = None):
1183 if not table:
1184 table = self.__table
mbligh4314a712008-02-29 22:44:30 +00001185
mbligh6f8bab42008-02-29 22:45:14 +00001186 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001187 SELECT count(*) FROM %s
1188 WHERE %s
1189 """ % (table, where))
mbligh36768f02008-02-22 18:28:33 +00001190
mbligh6f8bab42008-02-29 22:45:14 +00001191 assert len(rows) == 1
1192
1193 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001194
1195
1196 def num_cols(self):
1197 return len(self.__fields)
1198
1199
1200 def update_field(self, field, value):
1201 assert self.__valid_fields[field]
1202
1203 if self.__dict__[field] == value:
1204 return
1205
1206 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
1207 (self.__table, field)
mbligh6f8bab42008-02-29 22:45:14 +00001208 _db.execute(query, (value, self.id))
mbligh36768f02008-02-22 18:28:33 +00001209
1210 self.__dict__[field] = value
1211
1212
1213 def save(self):
1214 if self.__new_record:
1215 keys = self.__fields[1:] # avoid id
1216 columns = ','.join([str(key) for key in keys])
1217 values = ['"%s"' % self.__dict__[key] for key in keys]
1218 values = ','.join(values)
1219 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1220 (self.__table, columns, values)
mbligh6f8bab42008-02-29 22:45:14 +00001221 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001222
1223
mblighe2586682008-02-29 22:45:46 +00001224 def delete(self):
1225 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1226 _db.execute(query, (self.id,))
1227
1228
1229 @classmethod
1230 def fetch(cls, where):
1231 rows = _db.execute(
1232 'SELECT * FROM %s WHERE %s' % (cls._get_table(), where))
1233 for row in rows:
1234 yield cls(row=row)
1235
mbligh36768f02008-02-22 18:28:33 +00001236
1237class IneligibleHostQueue(DBObject):
1238 def __init__(self, id=None, row=None, new_record=None):
1239 fields = ['id', 'job_id', 'host_id']
mblighd64e5702008-04-04 21:39:28 +00001240 super(IneligibleHostQueue, self).__init__(fields, id=id,
1241 row=row, new_record=new_record)
mblighe2586682008-02-29 22:45:46 +00001242
1243
1244 @classmethod
1245 def _get_table(cls):
1246 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001247
1248
1249class Host(DBObject):
1250 def __init__(self, id=None, row=None):
1251 fields = ['id', 'hostname', 'locked', 'synch_id','status']
mblighd64e5702008-04-04 21:39:28 +00001252 super(Host, self).__init__(fields, id=id, row=row)
mblighe2586682008-02-29 22:45:46 +00001253
1254
1255 @classmethod
1256 def _get_table(cls):
1257 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001258
1259
1260 def current_task(self):
mbligh6f8bab42008-02-29 22:45:14 +00001261 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001262 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1263 """, (self.id,))
1264
mbligh6f8bab42008-02-29 22:45:14 +00001265 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001266 return None
1267 else:
mbligh6f8bab42008-02-29 22:45:14 +00001268 assert len(rows) == 1
1269 results = rows[0];
mbligh36768f02008-02-22 18:28:33 +00001270# print "current = %s" % results
1271 return HostQueueEntry(row=results)
1272
1273
1274 def next_queue_entries(self):
1275 if self.locked:
1276 print "%s locked, not queuing" % self.hostname
1277 return None
1278# print "%s/%s looking for work" % (self.hostname, self.platform_id)
mbligh6f8bab42008-02-29 22:45:14 +00001279 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001280 SELECT * FROM host_queue_entries
1281 WHERE ((host_id=%s) OR (meta_host IS NOT null AND
1282 (meta_host IN (
1283 SELECT label_id FROM hosts_labels WHERE host_id=%s
1284 )
1285 )
1286 AND job_id NOT IN (
1287 SELECT job_id FROM ineligible_host_queues
1288 WHERE host_id=%s
1289 )))
1290 AND NOT complete AND NOT active
1291 ORDER BY priority DESC, meta_host, id
1292 LIMIT 1
1293 """, (self.id,self.id, self.id))
1294
mbligh6f8bab42008-02-29 22:45:14 +00001295 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001296 return None
1297 else:
mbligh6f8bab42008-02-29 22:45:14 +00001298 return [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001299
1300 def yield_work(self):
1301 print "%s yielding work" % self.hostname
1302 if self.current_task():
1303 self.current_task().requeue()
1304
1305 def set_status(self,status):
mblighbb421852008-03-11 22:36:16 +00001306 print '%s -> %s' % (self.hostname, status)
mbligh36768f02008-02-22 18:28:33 +00001307 self.update_field('status',status)
1308
1309
1310class HostQueueEntry(DBObject):
1311 def __init__(self, id=None, row=None):
1312 assert id or row
1313 fields = ['id', 'job_id', 'host_id', 'priority', 'status',
1314 'meta_host', 'active', 'complete']
mblighd64e5702008-04-04 21:39:28 +00001315 super(HostQueueEntry, self).__init__(fields, id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001316 self.job = Job(self.job_id)
1317
1318 if self.host_id:
1319 self.host = Host(self.host_id)
1320 else:
1321 self.host = None
1322
1323 self.queue_log_path = os.path.join(self.job.results_dir(),
1324 'queue.log.' + str(self.id))
1325
1326
mblighe2586682008-02-29 22:45:46 +00001327 @classmethod
1328 def _get_table(cls):
1329 return 'host_queue_entries'
1330
1331
mbligh36768f02008-02-22 18:28:33 +00001332 def set_host(self, host):
1333 if host:
1334 self.queue_log_record('Assigning host ' + host.hostname)
1335 self.update_field('host_id', host.id)
1336 self.update_field('active', True)
mblighe2586682008-02-29 22:45:46 +00001337 self.block_host(host.id)
mbligh36768f02008-02-22 18:28:33 +00001338 else:
1339 self.queue_log_record('Releasing host')
mblighe2586682008-02-29 22:45:46 +00001340 self.unblock_host(self.host.id)
mbligh36768f02008-02-22 18:28:33 +00001341 self.update_field('host_id', None)
1342
1343 self.host = host
1344
1345
1346 def get_host(self):
mblighe2586682008-02-29 22:45:46 +00001347 return self.host
mbligh36768f02008-02-22 18:28:33 +00001348
1349
1350 def queue_log_record(self, log_line):
mblighe2586682008-02-29 22:45:46 +00001351 now = str(datetime.datetime.now())
mbligh36768f02008-02-22 18:28:33 +00001352 queue_log = open(self.queue_log_path, 'a', 0)
mblighe2586682008-02-29 22:45:46 +00001353 queue_log.write(now + ' ' + log_line + '\n')
mbligh36768f02008-02-22 18:28:33 +00001354 queue_log.close()
1355
1356
mblighe2586682008-02-29 22:45:46 +00001357 def block_host(self, host_id):
1358 print "creating block %s/%s" % (self.job.id, host_id)
1359 row = [0, self.job.id, host_id]
1360 block = IneligibleHostQueue(row=row, new_record=True)
1361 block.save()
1362
1363
1364 def unblock_host(self, host_id):
1365 print "removing block %s/%s" % (self.job.id, host_id)
1366 blocks = list(IneligibleHostQueue.fetch(
1367 'job_id=%d and host_id=%d' % (self.job.id, host_id)))
1368 assert len(blocks) == 1
1369 blocks[0].delete()
1370
1371
mbligh36768f02008-02-22 18:28:33 +00001372 def results_dir(self):
mblighe2586682008-02-29 22:45:46 +00001373 if self.job.is_synchronous() or self.job.num_machines() == 1:
1374 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001375 else:
1376 assert self.host
mblighe2586682008-02-29 22:45:46 +00001377 return os.path.join(self.job.job_dir,
1378 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +00001379
mblighe2586682008-02-29 22:45:46 +00001380
1381 def verify_results_dir(self):
1382 if self.job.is_synchronous() or self.job.num_machines() > 1:
1383 assert self.host
1384 return os.path.join(self.job.job_dir,
1385 self.host.hostname)
1386 else:
1387 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001388
1389
1390 def set_status(self, status):
1391 self.update_field('status', status)
1392 if self.host:
1393 hostname = self.host.hostname
1394 else:
1395 hostname = 'no host'
1396 print "%s/%d status -> %s" % (hostname, self.id, self.status)
1397 if status in ['Queued']:
1398 self.update_field('complete', False)
1399 self.update_field('active', False)
1400
mblighd5c95802008-03-05 00:33:46 +00001401 if status in ['Pending', 'Running', 'Verifying', 'Starting',
1402 'Abort', 'Aborting']:
mbligh36768f02008-02-22 18:28:33 +00001403 self.update_field('complete', False)
1404 self.update_field('active', True)
1405
mblighd5c95802008-03-05 00:33:46 +00001406 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
mbligh36768f02008-02-22 18:28:33 +00001407 self.update_field('complete', True)
1408 self.update_field('active', False)
1409
1410
1411 def run(self,assigned_host=None):
1412 if self.meta_host:
1413 assert assigned_host
mblighe2586682008-02-29 22:45:46 +00001414 # ensure results dir exists for the queue log
mbligh36768f02008-02-22 18:28:33 +00001415 self.job.create_results_dir()
1416 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001417
mbligh36768f02008-02-22 18:28:33 +00001418 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1419 self.meta_host, self.host.hostname, self.status)
1420
1421 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001422
mbligh36768f02008-02-22 18:28:33 +00001423 def requeue(self):
1424 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001425
mbligh36768f02008-02-22 18:28:33 +00001426 if self.meta_host:
1427 self.set_host(None)
1428
1429
mblighe2586682008-02-29 22:45:46 +00001430 def handle_host_failure(self):
1431 """\
1432 Called when this queue entry's host has failed verification and
1433 repair.
1434 """
mblighdffd6372008-02-29 22:47:33 +00001435 assert not self.meta_host
1436 self.set_status('Failed')
1437 if self.job.is_synchronous():
1438 self.job.stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00001439
1440
mbligh90a549d2008-03-25 23:52:34 +00001441 def clear_results_dir(self, results_dir=None, dont_delete_files=False):
mblighe2586682008-02-29 22:45:46 +00001442 results_dir = results_dir or self.results_dir()
1443 if not os.path.exists(results_dir):
1444 return
mbligh90a549d2008-03-25 23:52:34 +00001445 if dont_delete_files:
1446 temp_dir = tempfile.mkdtemp(suffix='.clear_results')
1447 print 'Moving results from %s to %s' % (results_dir,
1448 temp_dir)
mblighe2586682008-02-29 22:45:46 +00001449 for filename in os.listdir(results_dir):
mblighe2586682008-02-29 22:45:46 +00001450 path = os.path.join(results_dir, filename)
mbligh90a549d2008-03-25 23:52:34 +00001451 if dont_delete_files:
1452 shutil.move(path,
1453 os.path.join(temp_dir, filename))
1454 else:
1455 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00001456
1457
1458class Job(DBObject):
1459 def __init__(self, id=None, row=None):
1460 assert id or row
mblighd64e5702008-04-04 21:39:28 +00001461 super(Job, self).__init__(
mblighe2586682008-02-29 22:45:46 +00001462 ['id','owner','name','priority',
1463 'control_file','control_type','created_on',
1464 'synch_type', 'synch_count','synchronizing'],
1465 id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001466
mblighe2586682008-02-29 22:45:46 +00001467 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
1468 self.owner))
1469
1470
1471 @classmethod
1472 def _get_table(cls):
1473 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001474
1475
1476 def is_server_job(self):
1477 return self.control_type != 2
1478
1479
1480 def get_host_queue_entries(self):
mbligh6f8bab42008-02-29 22:45:14 +00001481 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001482 SELECT * FROM host_queue_entries
1483 WHERE job_id= %s
1484 """, (self.id,))
mbligh6f8bab42008-02-29 22:45:14 +00001485 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001486
1487 assert len(entries)>0
1488
1489 return entries
1490
1491
1492 def set_status(self, status, update_queues=False):
1493 self.update_field('status',status)
1494
1495 if update_queues:
1496 for queue_entry in self.get_host_queue_entries():
1497 queue_entry.set_status(status)
1498
1499
1500 def is_synchronous(self):
1501 return self.synch_type == 2
1502
1503
1504 def is_ready(self):
1505 if not self.is_synchronous():
1506 return True
1507 sql = "job_id=%s AND status='Pending'" % self.id
1508 count = self.count(sql, table='host_queue_entries')
1509 return (count == self.synch_count)
1510
1511
1512 def ready_to_synchronize(self):
1513 # heuristic
1514 queue_entries = self.get_host_queue_entries()
1515 count = 0
1516 for queue_entry in queue_entries:
1517 if queue_entry.status == 'Pending':
1518 count += 1
1519
1520 return (count/self.synch_count >= 0.5)
1521
1522
1523 def start_synchronizing(self):
1524 self.update_field('synchronizing', True)
1525
1526
1527 def results_dir(self):
1528 return self.job_dir
1529
1530 def num_machines(self, clause = None):
1531 sql = "job_id=%s" % self.id
1532 if clause:
1533 sql += " AND (%s)" % clause
1534 return self.count(sql, table='host_queue_entries')
1535
1536
1537 def num_queued(self):
1538 return self.num_machines('not complete')
1539
1540
1541 def num_active(self):
1542 return self.num_machines('active')
1543
1544
1545 def num_complete(self):
1546 return self.num_machines('complete')
1547
1548
1549 def is_finished(self):
1550 left = self.num_queued()
1551 print "%s: %s machines left" % (self.name, left)
1552 return left==0
1553
1554 def stop_synchronizing(self):
1555 self.update_field('synchronizing', False)
1556 self.set_status('Queued', update_queues = False)
1557
1558
mblighe2586682008-02-29 22:45:46 +00001559 def stop_all_entries(self):
1560 for child_entry in self.get_host_queue_entries():
1561 if not child_entry.complete:
1562 child_entry.set_status('Stopped')
1563
1564
1565 def write_to_machines_file(self, queue_entry):
1566 hostname = queue_entry.get_host().hostname
1567 print "writing %s to job %s machines file" % (hostname, self.id)
1568 file_path = os.path.join(self.job_dir, '.machines')
1569 mf = open(file_path, 'a')
1570 mf.write("%s\n" % queue_entry.get_host().hostname)
1571 mf.close()
mbligh36768f02008-02-22 18:28:33 +00001572
1573
1574 def create_results_dir(self, queue_entry=None):
1575 print "create: active: %s complete %s" % (self.num_active(),
1576 self.num_complete())
1577
1578 if not os.path.exists(self.job_dir):
1579 os.makedirs(self.job_dir)
1580
1581 if queue_entry:
1582 return queue_entry.results_dir()
1583 return self.job_dir
1584
1585
1586 def run(self, queue_entry):
1587 results_dir = self.create_results_dir(queue_entry)
1588
1589 if self.is_synchronous():
1590 if not self.is_ready():
mblighd5c95802008-03-05 00:33:46 +00001591 return Agent([VerifySynchronousTask(
1592 queue_entry = queue_entry)],
1593 [queue_entry.id])
mbligh36768f02008-02-22 18:28:33 +00001594
1595 queue_entry.set_status('Starting')
1596
1597 ctrl = open(os.tmpnam(), 'w')
1598 if self.control_file:
1599 ctrl.write(self.control_file)
1600 else:
1601 ctrl.write("")
1602 ctrl.flush()
1603
1604 if self.is_synchronous():
mbligh36768f02008-02-22 18:28:33 +00001605 queue_entries = self.get_host_queue_entries()
1606 else:
1607 assert queue_entry
mbligh36768f02008-02-22 18:28:33 +00001608 queue_entries = [queue_entry]
mblighe2586682008-02-29 22:45:46 +00001609 hostnames = ','.join([entry.get_host().hostname
1610 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00001611
mblighbb421852008-03-11 22:36:16 +00001612 params = [_autoserv_path, '-p', '-n',
1613 '-r', os.path.abspath(results_dir),
1614 '-b', '-u', self.owner, '-l', self.name,
1615 '-m', hostnames, ctrl.name]
mbligh36768f02008-02-22 18:28:33 +00001616
1617 if not self.is_server_job():
1618 params.append('-c')
1619
1620 tasks = []
1621 if not self.is_synchronous():
1622 tasks.append(VerifyTask(queue_entry))
mblighe2586682008-02-29 22:45:46 +00001623
1624 tasks.append(QueueTask(job = self,
1625 queue_entries = queue_entries,
1626 cmd = params))
mbligh36768f02008-02-22 18:28:33 +00001627
mblighd5c95802008-03-05 00:33:46 +00001628 ids = []
1629 for entry in queue_entries:
1630 ids.append(entry.id)
1631
1632 agent = Agent(tasks, ids)
mbligh36768f02008-02-22 18:28:33 +00001633
1634 return agent
1635
1636
1637if __name__ == '__main__':
1638 main()