blob: 8009230bc7207aeeb07f9a644d8982c22a6839ae [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
mblighcadb3532008-04-15 17:46:26 +00009import optparse, signal, smtplib, socket, datetime, stat, pwd, errno
mblighb090f142008-02-27 21:33:46 +000010from common import global_config
11
mbligh36768f02008-02-22 18:28:33 +000012RESULTS_DIR = '.'
13AUTOSERV_NICE_LEVEL = 10
14
15AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
16
17if os.environ.has_key('AUTOTEST_DIR'):
18 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
19AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
20AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
21
22if AUTOTEST_SERVER_DIR not in sys.path:
23 sys.path.insert(0, AUTOTEST_SERVER_DIR)
24
mblighbb421852008-03-11 22:36:16 +000025AUTOSERV_PID_FILE = '.autoserv_execute'
mbligh90a549d2008-03-25 23:52:34 +000026# how long to wait for autoserv to write a pidfile
27PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000028
mbligh6f8bab42008-02-29 22:45:14 +000029_db = None
mbligh36768f02008-02-22 18:28:33 +000030_shutdown = False
31_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000032_autoserv_path = 'autoserv'
33_testing_mode = False
mbligh36768f02008-02-22 18:28:33 +000034
35
36def main():
37 usage = 'usage: %prog [options] results_dir'
38
39 parser = optparse.OptionParser(usage)
mblighbb421852008-03-11 22:36:16 +000040 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
mbligh36768f02008-02-22 18:28:33 +000041 action='store_true')
42 parser.add_option('--logfile', help='Set a log file that all stdout ' +
43 'should be redirected to. Stderr will go to this ' +
44 'file + ".err"')
45 parser.add_option('--notify', help='Set an email address to be ' +
46 'notified of exceptions')
mbligh4314a712008-02-29 22:44:30 +000047 parser.add_option('--test', help='Indicate that scheduler is under ' +
48 'test and should use dummy autoserv and no parsing',
49 action='store_true')
mbligh36768f02008-02-22 18:28:33 +000050 (options, args) = parser.parse_args()
51 if len(args) != 1:
52 parser.print_usage()
53 return
54
55 global RESULTS_DIR
56 RESULTS_DIR = args[0]
57
58 global _notify_email
59 _notify_email = options.notify
mbligh4314a712008-02-29 22:44:30 +000060
61 if options.test:
62 global _autoserv_path
63 _autoserv_path = 'autoserv_dummy'
64 global _testing_mode
65 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000066
67 init(options.logfile)
mblighbb421852008-03-11 22:36:16 +000068 dispatcher = Dispatcher()
69 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
mbligh36768f02008-02-22 18:28:33 +000070
71 try:
72 while not _shutdown:
73 dispatcher.tick()
74 time.sleep(20)
mbligh36768f02008-02-22 18:28:33 +000075 except:
76 log_stacktrace("Uncaught exception; terminating monitor_db")
77
mbligh6f8bab42008-02-29 22:45:14 +000078 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +000079
80
81def handle_sigint(signum, frame):
82 global _shutdown
83 _shutdown = True
84 print "Shutdown request received."
85
86
87def init(logfile):
88 if logfile:
89 enable_logging(logfile)
90 print "%s> dispatcher starting" % time.strftime("%X %x")
91 print "My PID is %d" % os.getpid()
92
93 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
mbligh6f8bab42008-02-29 22:45:14 +000094 global _db
95 _db = DatabaseConn()
mbligh36768f02008-02-22 18:28:33 +000096
97 print "Setting signal handler"
98 signal.signal(signal.SIGINT, handle_sigint)
99
100 print "Connected! Running..."
101
102
103def enable_logging(logfile):
104 out_file = logfile
105 err_file = "%s.err" % logfile
106 print "Enabling logging to %s (%s)" % (out_file, err_file)
107 out_fd = open(out_file, "a", buffering=0)
108 err_fd = open(err_file, "a", buffering=0)
109
110 os.dup2(out_fd.fileno(), sys.stdout.fileno())
111 os.dup2(err_fd.fileno(), sys.stderr.fileno())
112
113 sys.stdout = out_fd
114 sys.stderr = err_fd
115
116
117def idle_hosts():
mbligh6f8bab42008-02-29 22:45:14 +0000118 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000119 SELECT * FROM hosts h WHERE
120 id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
121 (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
122 OR
123 (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
124 INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
125 )
126 AND locked=false AND (h.status IS null OR h.status='Ready') """)
mbligh6f8bab42008-02-29 22:45:14 +0000127 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000128 return hosts
129
mblighd5c95802008-03-05 00:33:46 +0000130def queue_entries_to_abort():
131 rows = _db.execute("""
132 SELECT * FROM host_queue_entries WHERE status='Abort';
133 """)
134 qe = [HostQueueEntry(row=i) for i in rows]
135 return qe
mbligh36768f02008-02-22 18:28:33 +0000136
mblighe2586682008-02-29 22:45:46 +0000137def remove_file_or_dir(path):
138 if stat.S_ISDIR(os.stat(path).st_mode):
139 # directory
140 shutil.rmtree(path)
141 else:
142 # file
143 os.remove(path)
144
145
mbligh6f8bab42008-02-29 22:45:14 +0000146class DatabaseConn:
147 def __init__(self):
148 self.reconnect_wait = 20
149 self.conn = None
150 self.cur = None
mbligh36768f02008-02-22 18:28:33 +0000151
mbligh4eb2df22008-03-13 15:39:29 +0000152 import MySQLdb.converters
153 self.convert_dict = MySQLdb.converters.conversions
154 self.convert_dict.setdefault(bool, self.convert_boolean)
155
mbligh6f8bab42008-02-29 22:45:14 +0000156 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000157
158
mbligh4eb2df22008-03-13 15:39:29 +0000159 @staticmethod
160 def convert_boolean(boolean, conversion_dict):
161 'Convert booleans to integer strings'
162 return str(int(boolean))
163
164
mbligh6f8bab42008-02-29 22:45:14 +0000165 def connect(self):
166 self.disconnect()
167
168 # get global config and parse for info
169 c = global_config.global_config
170 dbase = "AUTOTEST_WEB"
mbligh104e9ce2008-03-11 22:01:44 +0000171 DB_HOST = c.get_config_value(dbase, "host")
172 DB_SCHEMA = c.get_config_value(dbase, "database")
mbligh6f8bab42008-02-29 22:45:14 +0000173
174 global _testing_mode
175 if _testing_mode:
176 DB_SCHEMA = 'stresstest_autotest_web'
177
mbligh104e9ce2008-03-11 22:01:44 +0000178 DB_USER = c.get_config_value(dbase, "user")
179 DB_PASS = c.get_config_value(dbase, "password")
mbligh6f8bab42008-02-29 22:45:14 +0000180
181 while not self.conn:
182 try:
mbligh4eb2df22008-03-13 15:39:29 +0000183 self.conn = MySQLdb.connect(
184 host=DB_HOST, user=DB_USER, passwd=DB_PASS,
185 db=DB_SCHEMA, conv=self.convert_dict)
mbligh6f8bab42008-02-29 22:45:14 +0000186
187 self.conn.autocommit(True)
188 self.cur = self.conn.cursor()
189 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000190 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000191 print "Can't connect to MYSQL; reconnecting"
192 time.sleep(self.reconnect_wait)
193 self.disconnect()
194
195
196 def disconnect(self):
197 if self.conn:
198 self.conn.close()
199 self.conn = None
200 self.cur = None
201
202
203 def execute(self, *args, **dargs):
204 while (True):
205 try:
206 self.cur.execute(*args, **dargs)
207 return self.cur.fetchall()
208 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000209 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000210 print "MYSQL connection died; reconnecting"
211 time.sleep(self.reconnect_wait)
212 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000213
214
mblighdbdac6c2008-03-05 15:49:58 +0000215def generate_parse_command(results_dir, flags=""):
216 parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
217 output = os.path.abspath(os.path.join(results_dir, '.parse.log'))
218 cmd = "%s %s -r -o %s > %s 2>&1 &"
219 return cmd % (parse, flags, results_dir, output)
220
221
mbligh36768f02008-02-22 18:28:33 +0000222def parse_results(results_dir, flags=""):
mbligh4314a712008-02-29 22:44:30 +0000223 if _testing_mode:
224 return
mblighdbdac6c2008-03-05 15:49:58 +0000225 os.system(generate_parse_command(results_dir, flags))
mbligh36768f02008-02-22 18:28:33 +0000226
227
mblighbb421852008-03-11 22:36:16 +0000228def send_notify_email(subject, message):
229 if not _notify_email:
230 return
231
232 message = "%s / %s / %s\n%s" % (socket.gethostname(), os.getpid(),
233 time.strftime("%X %x"), message)
234 sender = pwd.getpwuid(os.getuid())[0] # see os.getlogin() online docs
235 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
236 sender, _notify_email, subject, message)
237 mailer = smtplib.SMTP('localhost')
238 mailer.sendmail(sender, _notify_email, msg)
239 mailer.quit()
240
241
mbligh36768f02008-02-22 18:28:33 +0000242def log_stacktrace(reason):
243 (type, value, tb) = sys.exc_info()
244 str = "EXCEPTION: %s\n" % reason
mbligh36768f02008-02-22 18:28:33 +0000245 str += ''.join(traceback.format_exception(type, value, tb))
246
247 sys.stderr.write("\n%s\n" % str)
mblighbb421852008-03-11 22:36:16 +0000248 send_notify_email("monitor_db exception", str)
mbligh36768f02008-02-22 18:28:33 +0000249
mblighbb421852008-03-11 22:36:16 +0000250
251def get_proc_poll_fn(pid):
252 proc_path = os.path.join('/proc', str(pid))
253 def poll_fn():
254 if os.path.exists(proc_path):
255 return None
256 return 0 # we can't get a real exit code
257 return poll_fn
258
259
260def kill_autoserv(pid, poll_fn=None):
261 print 'killing', pid
262 if poll_fn is None:
263 poll_fn = get_proc_poll_fn(pid)
264 if poll_fn() == None:
265 os.kill(pid, signal.SIGCONT)
266 os.kill(pid, signal.SIGTERM)
mbligh36768f02008-02-22 18:28:33 +0000267
268
269class Dispatcher:
mbligh90a549d2008-03-25 23:52:34 +0000270 autoserv_procs_cache = None
271
mblighbb421852008-03-11 22:36:16 +0000272 def __init__(self):
mbligh36768f02008-02-22 18:28:33 +0000273 self._agents = []
mbligh36768f02008-02-22 18:28:33 +0000274
mbligh36768f02008-02-22 18:28:33 +0000275
mblighbb421852008-03-11 22:36:16 +0000276 def do_initial_recovery(self, recover_hosts=True):
277 # always recover processes
278 self._recover_processes()
279
280 if recover_hosts:
281 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000282
283
284 def tick(self):
mbligh90a549d2008-03-25 23:52:34 +0000285 Dispatcher.autoserv_procs_cache = None
mblighbb421852008-03-11 22:36:16 +0000286 self._find_aborting()
287 self._find_more_work()
mbligh36768f02008-02-22 18:28:33 +0000288 self._handle_agents()
289
290
291 def add_agent(self, agent):
292 self._agents.append(agent)
293 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000294
295 # Find agent corresponding to the specified queue_entry
296 def get_agents(self, queue_entry):
297 res_agents = []
298 for agent in self._agents:
299 if queue_entry.id in agent.queue_entry_ids:
300 res_agents.append(agent)
301 return res_agents
302
303
304 def remove_agent(self, agent):
305 self._agents.remove(agent)
mbligh36768f02008-02-22 18:28:33 +0000306
307
mbligh90a549d2008-03-25 23:52:34 +0000308 @classmethod
309 def find_autoservs(cls, orphans_only=False):
mblighbb421852008-03-11 22:36:16 +0000310 """\
311 Returns a dict mapping pids to command lines for root autoserv
mbligh90a549d2008-03-25 23:52:34 +0000312 processes. If orphans_only=True, return only processes that
313 have been orphaned (i.e. parent pid = 1).
mblighbb421852008-03-11 22:36:16 +0000314 """
mbligh90a549d2008-03-25 23:52:34 +0000315 if cls.autoserv_procs_cache is not None:
316 return cls.autoserv_procs_cache
317
mblighbb421852008-03-11 22:36:16 +0000318 proc = subprocess.Popen(
mbligh90a549d2008-03-25 23:52:34 +0000319 ['/bin/ps', 'x', '-o', 'pid,pgid,ppid,comm,args'],
mblighbb421852008-03-11 22:36:16 +0000320 stdout=subprocess.PIPE)
321 # split each line into the four columns output by ps
mbligh90a549d2008-03-25 23:52:34 +0000322 procs = [line.split(None, 4) for line in
mblighbb421852008-03-11 22:36:16 +0000323 proc.communicate()[0].splitlines()]
mbligh90a549d2008-03-25 23:52:34 +0000324 autoserv_procs = {}
325 for proc in procs:
326 # check ppid == 1 for orphans
327 if orphans_only and proc[2] != 1:
328 continue
329 # only root autoserv processes have pgid == pid
330 if (proc[3] == 'autoserv' and # comm
331 proc[1] == proc[0]): # pgid == pid
332 # map pid to args
333 autoserv_procs[int(proc[0])] = proc[4]
334 cls.autoserv_procs_cache = autoserv_procs
335 return autoserv_procs
mblighbb421852008-03-11 22:36:16 +0000336
337
338 def recover_queue_entry(self, queue_entry, run_monitor):
339 job = queue_entry.job
340 if job.is_synchronous():
341 all_queue_entries = job.get_host_queue_entries()
342 else:
343 all_queue_entries = [queue_entry]
344 all_queue_entry_ids = [queue_entry.id for queue_entry
345 in all_queue_entries]
346 queue_task = RecoveryQueueTask(
347 job=queue_entry.job,
348 queue_entries=all_queue_entries,
349 run_monitor=run_monitor)
350 self.add_agent(Agent(tasks=[queue_task],
351 queue_entry_ids=all_queue_entry_ids))
352
353
354 def _recover_processes(self):
mbligh90a549d2008-03-25 23:52:34 +0000355 orphans = self.find_autoservs(orphans_only=True)
mblighbb421852008-03-11 22:36:16 +0000356
357 # first, recover running queue entries
358 rows = _db.execute("""SELECT * FROM host_queue_entries
359 WHERE status = 'Running'""")
360 queue_entries = [HostQueueEntry(row=i) for i in rows]
361 requeue_entries = []
362 recovered_entry_ids = set()
363 for queue_entry in queue_entries:
364 run_monitor = PidfileRunMonitor(
365 queue_entry.results_dir())
366 pid, exit_code = run_monitor.get_pidfile_info()
367 if pid is None:
368 # autoserv apparently never got run, so requeue
369 requeue_entries.append(queue_entry)
370 continue
371 if queue_entry.id in recovered_entry_ids:
372 # synchronous job we've already recovered
373 continue
374 print 'Recovering queue entry %d (pid %d)' % (
375 queue_entry.id, pid)
376 job = queue_entry.job
377 if job.is_synchronous():
378 for entry in job.get_host_queue_entries():
379 assert entry.active
380 recovered_entry_ids.add(entry.id)
381 self.recover_queue_entry(queue_entry,
382 run_monitor)
383 orphans.pop(pid, None)
384
385 # and requeue other active queue entries
386 rows = _db.execute("""SELECT * FROM host_queue_entries
387 WHERE active AND NOT complete
388 AND status != 'Running'
389 AND status != 'Pending'
390 AND status != 'Abort'
391 AND status != 'Aborting'""")
392 queue_entries = [HostQueueEntry(row=i) for i in rows]
393 for queue_entry in queue_entries + requeue_entries:
394 print 'Requeuing running QE %d' % queue_entry.id
mbligh90a549d2008-03-25 23:52:34 +0000395 queue_entry.clear_results_dir(dont_delete_files=True)
mblighbb421852008-03-11 22:36:16 +0000396 queue_entry.requeue()
397
398
399 # now kill any remaining autoserv processes
400 for pid in orphans.keys():
401 print 'Killing orphan %d (%s)' % (pid, orphans[pid])
402 kill_autoserv(pid)
403
404 # recover aborting tasks
mbligh90a549d2008-03-25 23:52:34 +0000405 rebooting_host_ids = set()
mblighd5c95802008-03-05 00:33:46 +0000406 rows = _db.execute("""SELECT * FROM host_queue_entries
407 WHERE status='Abort' or status='Aborting'""")
mblighbb421852008-03-11 22:36:16 +0000408 queue_entries = [HostQueueEntry(row=i) for i in rows]
409 for queue_entry in queue_entries:
mbligh90a549d2008-03-25 23:52:34 +0000410 print 'Recovering aborting QE %d' % queue_entry.id
mblighbb421852008-03-11 22:36:16 +0000411 queue_host = queue_entry.get_host()
412 reboot_task = RebootTask(queue_host)
413 verify_task = VerifyTask(host = queue_host)
414 self.add_agent(Agent(tasks=[reboot_task,
415 verify_task],
416 queue_entry_ids=[queue_entry.id]))
417 queue_entry.set_status('Aborted')
418 # Secure the host from being picked up
419 queue_host.set_status('Rebooting')
mbligh90a549d2008-03-25 23:52:34 +0000420 rebooting_host_ids.add(queue_host.id)
mblighd5c95802008-03-05 00:33:46 +0000421
mblighbb421852008-03-11 22:36:16 +0000422 # reverify hosts that were in the middle of verify, repair or
423 # reboot
mbligh90a549d2008-03-25 23:52:34 +0000424 self._reverify_hosts_where("""(status = 'Repairing' OR
425 status = 'Verifying' OR
426 status = 'Rebooting')""",
427 exclude_ids=rebooting_host_ids)
428
429 # finally, recover "Running" hosts with no active queue entries,
430 # although this should never happen
431 message = ('Recovering running host %s - this probably '
432 'indicates a scheduler bug')
433 self._reverify_hosts_where("""status = 'Running' AND
434 id NOT IN (SELECT host_id
435 FROM host_queue_entries
436 WHERE active)""",
437 print_message=message)
438
439
440 def _reverify_hosts_where(self, where,
441 print_message='Reverifying host %s',
442 exclude_ids=set()):
443 rows = _db.execute('SELECT * FROM hosts WHERE locked = 0 AND ' +
444 where)
mblighbb421852008-03-11 22:36:16 +0000445 hosts = [Host(row=i) for i in rows]
446 for host in hosts:
mbligh90a549d2008-03-25 23:52:34 +0000447 if host.id in exclude_ids:
448 continue
449 if print_message is not None:
450 print print_message % host.hostname
451 verify_task = VerifyTask(host = host)
452 self.add_agent(Agent(tasks = [verify_task]))
mblighbb421852008-03-11 22:36:16 +0000453
454
455 def _recover_hosts(self):
mbligh90a549d2008-03-25 23:52:34 +0000456 # recover "Repair Failed" hosts
457 message = 'Reverifying dead host %s'
458 self._reverify_hosts_where("status = 'Repair Failed'",
459 print_message=message)
mbligh36768f02008-02-22 18:28:33 +0000460
461
462 def _find_more_work(self):
463 print "finding work"
464
465 num_started = 0
466 for host in idle_hosts():
467 tasks = host.next_queue_entries()
468 if tasks:
469 for next in tasks:
470 try:
471 agent = next.run(assigned_host=host)
472 if agent:
473 self.add_agent(agent)
474
475 num_started += 1
476 if num_started>=100:
477 return
478 break
479 except:
480 next.set_status('Failed')
481
482# if next.host:
483# next.host.set_status('Ready')
484
485 log_stacktrace("task_id = %d" % next.id)
486
487
mblighd5c95802008-03-05 00:33:46 +0000488 def _find_aborting(self):
489 num_aborted = 0
490 # Find jobs that are aborting
491 for entry in queue_entries_to_abort():
492 agents_to_abort = self.get_agents(entry)
493 entry_host = entry.get_host()
494 reboot_task = RebootTask(entry_host)
495 verify_task = VerifyTask(host = entry_host)
496 tasks = [reboot_task, verify_task]
497 if agents_to_abort:
498 abort_task = AbortTask(entry, agents_to_abort)
499 tasks.insert(0, abort_task)
500 else:
501 entry.set_status('Aborted')
502 # just to make sure this host does not get
503 # taken away
504 entry_host.set_status('Rebooting')
505 self.add_agent(Agent(tasks=tasks,
506 queue_entry_ids = [entry.id]))
507 num_aborted += 1
508 if num_aborted >= 50:
509 break
510
511
mbligh36768f02008-02-22 18:28:33 +0000512 def _handle_agents(self):
513 still_running = []
514 for agent in self._agents:
515 agent.tick()
516 if not agent.is_done():
517 still_running.append(agent)
518 else:
519 print "agent finished"
520 self._agents = still_running
521
522
523class RunMonitor(object):
524 def __init__(self, cmd, nice_level = None, log_file = None):
525 self.nice_level = nice_level
526 self.log_file = log_file
527 self.proc = self.run(cmd)
528
529 def run(self, cmd):
530 if self.nice_level:
531 nice_cmd = ['nice','-n', str(self.nice_level)]
532 nice_cmd.extend(cmd)
533 cmd = nice_cmd
534
535 out_file = None
536 if self.log_file:
537 try:
mblighbb421852008-03-11 22:36:16 +0000538 os.makedirs(os.path.dirname(self.log_file))
mblighcadb3532008-04-15 17:46:26 +0000539 except OSError, exc:
540 if exc.errno != errno.EEXIST:
541 log_stacktrace(
542 'Unexpected error creating logfile '
543 'directory for %s' % self.log_file)
544 try:
mbligh36768f02008-02-22 18:28:33 +0000545 out_file = open(self.log_file, 'a')
546 out_file.write("\n%s\n" % ('*'*80))
mblighcadb3532008-04-15 17:46:26 +0000547 out_file.write("%s> %s\n" %
548 (time.strftime("%X %x"), cmd))
mbligh36768f02008-02-22 18:28:33 +0000549 out_file.write("%s\n" % ('*'*80))
mblighcadb3532008-04-15 17:46:26 +0000550 except (OSError, IOError):
551 log_stacktrace('Error opening log file %s' %
552 self.log_file)
553
mbligh36768f02008-02-22 18:28:33 +0000554 if not out_file:
555 out_file = open('/dev/null', 'w')
mblighcadb3532008-04-15 17:46:26 +0000556
mbligh36768f02008-02-22 18:28:33 +0000557 in_devnull = open('/dev/null', 'r')
558 print "cmd = %s" % cmd
559 print "path = %s" % os.getcwd()
560
561 proc = subprocess.Popen(cmd, stdout=out_file,
562 stderr=subprocess.STDOUT, stdin=in_devnull)
563 out_file.close()
564 in_devnull.close()
565 return proc
566
567
mblighbb421852008-03-11 22:36:16 +0000568 def get_pid(self):
569 return self.proc.pid
570
571
mbligh36768f02008-02-22 18:28:33 +0000572 def kill(self):
mblighbb421852008-03-11 22:36:16 +0000573 kill_autoserv(self.get_pid(), self.exit_code)
574
mbligh36768f02008-02-22 18:28:33 +0000575
576 def exit_code(self):
577 return self.proc.poll()
578
579
mblighbb421852008-03-11 22:36:16 +0000580class PidfileException(Exception):
581 """\
582 Raised when there's some unexpected behavior with the pid file.
583 """
584
585
586class PidfileRunMonitor(RunMonitor):
587 def __init__(self, results_dir, cmd=None, nice_level=None,
588 log_file=None):
589 self.results_dir = os.path.abspath(results_dir)
590 self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
591 self.lost_process = False
mbligh90a549d2008-03-25 23:52:34 +0000592 self.start_time = time.time()
mblighbb421852008-03-11 22:36:16 +0000593 if cmd is None:
594 # we're reattaching to an existing pid, so don't call
595 # the superconstructor (we don't want to kick off a new
596 # process)
597 pass
598 else:
mblighd64e5702008-04-04 21:39:28 +0000599 super(PidfileRunMonitor, self).__init__(cmd,
600 nice_level, log_file)
mblighbb421852008-03-11 22:36:16 +0000601
602
603 def get_pid(self):
604 pid, exit_status = self.get_pidfile_info()
605 assert pid is not None
606 return pid
607
608
mbligh90a549d2008-03-25 23:52:34 +0000609 def _check_command_line(self, command_line, spacer=' ',
610 print_error=False):
611 results_dir_arg = spacer.join(('', '-r', self.results_dir, ''))
612 match = results_dir_arg in command_line
613 if print_error and not match:
614 print '%s not found in %s' % (repr(results_dir_arg),
615 repr(command_line))
616 return match
617
618
619 def _check_proc_fs(self, pid):
mblighbb421852008-03-11 22:36:16 +0000620 cmdline_path = os.path.join('/proc', str(pid), 'cmdline')
621 try:
622 cmdline_file = open(cmdline_path, 'r')
623 cmdline = cmdline_file.read().strip()
624 cmdline_file.close()
625 except IOError:
626 return False
627 # /proc/.../cmdline has \x00 separating args
mbligh90a549d2008-03-25 23:52:34 +0000628 return self._check_command_line(cmdline, spacer='\x00',
629 print_error=True)
mblighbb421852008-03-11 22:36:16 +0000630
631
632 def read_pidfile(self):
633 if not os.path.exists(self.pid_file):
634 return None, None
635 file_obj = open(self.pid_file, 'r')
636 lines = file_obj.readlines()
637 file_obj.close()
638 assert 1 <= len(lines) <= 2
639 try:
640 pid = int(lines[0])
641 exit_status = None
642 if len(lines) == 2:
643 exit_status = int(lines[1])
644 except ValueError, exc:
645 raise Exception('Corrupt pid file: ' + str(exc.args))
646
647 return pid, exit_status
648
649
mbligh90a549d2008-03-25 23:52:34 +0000650 def _find_autoserv_proc(self):
651 autoserv_procs = Dispatcher.find_autoservs()
652 for pid, args in autoserv_procs.iteritems():
653 if self._check_command_line(args):
654 return pid, args
655 return None, None
656
657
mblighbb421852008-03-11 22:36:16 +0000658 def get_pidfile_info(self):
659 """\
660 Returns:
661 None, None if autoserv has not yet run
662 pid, None if autoserv is running
663 pid, exit_status if autoserv has completed
664 """
665 if self.lost_process:
666 return self.pid, self.exit_status
667
668 pid, exit_status = self.read_pidfile()
669
mbligh90a549d2008-03-25 23:52:34 +0000670 if pid is None:
671 return self._handle_no_pid()
672
673 if exit_status is None:
674 # double check whether or not autoserv is running
675 proc_running = self._check_proc_fs(pid)
676 if proc_running:
677 return pid, exit_status
678
679 # pid but no process - maybe process *just* exited
mblighbb421852008-03-11 22:36:16 +0000680 pid, exit_status = self.read_pidfile()
mbligh90a549d2008-03-25 23:52:34 +0000681 if exit_status is None:
mblighbb421852008-03-11 22:36:16 +0000682 # autoserv exited without writing an exit code
683 # to the pidfile
684 error = ('autoserv died without writing exit '
685 'code')
686 message = error + '\nPid: %s\nPidfile: %s' % (
687 pid, self.pid_file)
688 print message
689 send_notify_email(error, message)
mbligh90a549d2008-03-25 23:52:34 +0000690 self.on_lost_process(pid)
mblighbb421852008-03-11 22:36:16 +0000691 return self.pid, self.exit_status
692
693 return pid, exit_status
694
695
mbligh90a549d2008-03-25 23:52:34 +0000696 def _handle_no_pid(self):
697 """\
698 Called when no pidfile is found or no pid is in the pidfile.
699 """
700 # is autoserv running?
701 pid, args = self._find_autoserv_proc()
702 if pid is None:
703 # no autoserv process running
704 message = 'No pid found at ' + self.pid_file
705 else:
706 message = ("Process %d (%s) hasn't written pidfile %s" %
707 (pid, args, self.pid_file))
708
709 print message
710 if time.time() - self.start_time > PIDFILE_TIMEOUT:
711 send_notify_email('Process has failed to write pidfile',
712 message)
713 if pid is not None:
714 kill_autoserv(pid)
715 else:
716 pid = 0
717 self.on_lost_process(pid)
718 return self.pid, self.exit_status
719
720 return None, None
721
722
723 def on_lost_process(self, pid):
724 """\
725 Called when autoserv has exited without writing an exit status,
726 or we've timed out waiting for autoserv to write a pid to the
727 pidfile. In either case, we just return failure and the caller
728 should signal some kind of warning.
729
730 pid is unimportant here, as it shouldn't be used by anyone.
731 """
732 self.lost_process = True
733 self.pid = pid
734 self.exit_status = 1
735
736
mblighbb421852008-03-11 22:36:16 +0000737 def exit_code(self):
738 pid, exit_code = self.get_pidfile_info()
mblighbb421852008-03-11 22:36:16 +0000739 return exit_code
740
741
mbligh36768f02008-02-22 18:28:33 +0000742class Agent(object):
mblighd5c95802008-03-05 00:33:46 +0000743 def __init__(self, tasks, queue_entry_ids=[]):
mbligh36768f02008-02-22 18:28:33 +0000744 self.active_task = None
745 self.queue = Queue.Queue(0)
746 self.dispatcher = None
mblighd5c95802008-03-05 00:33:46 +0000747 self.queue_entry_ids = queue_entry_ids
mbligh36768f02008-02-22 18:28:33 +0000748
749 for task in tasks:
750 self.add_task(task)
751
752
753 def add_task(self, task):
754 self.queue.put_nowait(task)
755 task.agent = self
756
757
758 def tick(self):
759 print "agent tick"
760 if self.active_task and not self.active_task.is_done():
761 self.active_task.poll()
762 else:
763 self._next_task();
764
765
766 def _next_task(self):
767 print "agent picking task"
768 if self.active_task:
769 assert self.active_task.is_done()
770
mblighe2586682008-02-29 22:45:46 +0000771 if not self.active_task.success:
772 self.on_task_failure()
773
mbligh36768f02008-02-22 18:28:33 +0000774 self.active_task = None
775 if not self.is_done():
776 self.active_task = self.queue.get_nowait()
777 if self.active_task:
778 self.active_task.start()
779
780
mblighe2586682008-02-29 22:45:46 +0000781 def on_task_failure(self):
mblighe2586682008-02-29 22:45:46 +0000782 self.queue = Queue.Queue(0)
783 for task in self.active_task.failure_tasks:
784 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +0000785
mblighe2586682008-02-29 22:45:46 +0000786
mbligh36768f02008-02-22 18:28:33 +0000787 def is_done(self):
788 return self.active_task == None and self.queue.empty()
789
790
791 def start(self):
792 assert self.dispatcher
793
794 self._next_task()
795
mblighd5c95802008-03-05 00:33:46 +0000796
mbligh36768f02008-02-22 18:28:33 +0000797class AgentTask(object):
mbligh16c722d2008-03-05 00:58:44 +0000798 def __init__(self, cmd, failure_tasks = []):
mbligh36768f02008-02-22 18:28:33 +0000799 self.done = False
800 self.failure_tasks = failure_tasks
801 self.started = False
802 self.cmd = cmd
mblighd5c95802008-03-05 00:33:46 +0000803 self.task = None
mbligh36768f02008-02-22 18:28:33 +0000804 self.agent = None
mblighd5c95802008-03-05 00:33:46 +0000805 self.monitor = None
mblighd64e5702008-04-04 21:39:28 +0000806 self.success = None
mbligh36768f02008-02-22 18:28:33 +0000807
808
809 def poll(self):
810 print "poll"
mblighd5c95802008-03-05 00:33:46 +0000811 if self.monitor:
mbligh36768f02008-02-22 18:28:33 +0000812 self.tick(self.monitor.exit_code())
813 else:
814 self.finished(False)
815
816
817 def tick(self, exit_code):
818 if exit_code==None:
819 return
820# print "exit_code was %d" % exit_code
821 if exit_code == 0:
822 success = True
823 else:
824 success = False
825
826 self.finished(success)
827
828
829 def is_done(self):
830 return self.done
831
832
833 def finished(self, success):
834 self.done = True
835 self.success = success
836 self.epilog()
837
838
839 def prolog(self):
840 pass
841
mblighd64e5702008-04-04 21:39:28 +0000842
843 def create_temp_resultsdir(self, suffix=''):
844 self.temp_results_dir = tempfile.mkdtemp(suffix=suffix)
845
846
847 def cleanup(self):
848 if (hasattr(self, 'temp_results_dir') and
849 os.path.exists(self.temp_results_dir)):
850 shutil.rmtree(self.temp_results_dir)
851
mbligh36768f02008-02-22 18:28:33 +0000852
853 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +0000854 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +0000855
856
857 def start(self):
858 assert self.agent
859
860 if not self.started:
861 self.prolog()
862 self.run()
863
864 self.started = True
865
mblighd64e5702008-04-04 21:39:28 +0000866
mbligh36768f02008-02-22 18:28:33 +0000867 def abort(self):
mblighd5c95802008-03-05 00:33:46 +0000868 if self.monitor:
869 self.monitor.kill()
870 self.done = True
mblighd64e5702008-04-04 21:39:28 +0000871 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +0000872
873
874 def run(self):
875 if self.cmd:
876 print "agent starting monitor"
mbligh36768f02008-02-22 18:28:33 +0000877 log_file = None
878 if hasattr(self, 'host'):
mblighbb421852008-03-11 22:36:16 +0000879 log_file = os.path.join(RESULTS_DIR, 'hosts',
880 self.host.hostname)
881 self.monitor = RunMonitor(
882 self.cmd, nice_level = AUTOSERV_NICE_LEVEL,
883 log_file = log_file)
mbligh36768f02008-02-22 18:28:33 +0000884
885
886class RepairTask(AgentTask):
mbligh16c722d2008-03-05 00:58:44 +0000887 def __init__(self, host, fail_queue_entry=None):
888 """\
889 fail_queue_entry: queue entry to mark failed if this repair
890 fails.
891 """
mblighd64e5702008-04-04 21:39:28 +0000892 self.create_temp_resultsdir('.repair')
893 cmd = [_autoserv_path , '-R', '-m', host.hostname,
894 '-r', self.temp_results_dir]
mbligh36768f02008-02-22 18:28:33 +0000895 self.host = host
mbligh16c722d2008-03-05 00:58:44 +0000896 self.fail_queue_entry = fail_queue_entry
mblighd64e5702008-04-04 21:39:28 +0000897 super(RepairTask, self).__init__(cmd)
mblighe2586682008-02-29 22:45:46 +0000898
mbligh36768f02008-02-22 18:28:33 +0000899
900 def prolog(self):
901 print "repair_task starting"
902 self.host.set_status('Repairing')
903
904
905 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +0000906 super(RepairTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +0000907 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000908 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +0000909 else:
mbligh16c722d2008-03-05 00:58:44 +0000910 self.host.set_status('Repair Failed')
911 if self.fail_queue_entry:
912 self.fail_queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +0000913
914
915class VerifyTask(AgentTask):
916 def __init__(self, queue_entry=None, host=None):
917 assert bool(queue_entry) != bool(host)
918
919 self.host = host or queue_entry.host
920 self.queue_entry = queue_entry
921
mblighd64e5702008-04-04 21:39:28 +0000922 self.create_temp_resultsdir('.verify')
mbligh48c10a52008-02-29 22:46:38 +0000923 cmd = [_autoserv_path,'-v','-m',self.host.hostname,
mbligh36768f02008-02-22 18:28:33 +0000924 '-r', self.temp_results_dir]
925
mbligh16c722d2008-03-05 00:58:44 +0000926 fail_queue_entry = None
927 if queue_entry and not queue_entry.meta_host:
928 fail_queue_entry = queue_entry
929 failure_tasks = [RepairTask(self.host, fail_queue_entry)]
mblighe2586682008-02-29 22:45:46 +0000930
mblighd64e5702008-04-04 21:39:28 +0000931 super(VerifyTask, self).__init__(cmd,
932 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +0000933
934
mbligh36768f02008-02-22 18:28:33 +0000935 def prolog(self):
936 print "starting verify on %s" % (self.host.hostname)
937 if self.queue_entry:
938 self.queue_entry.set_status('Verifying')
mblighdffd6372008-02-29 22:47:33 +0000939 self.queue_entry.clear_results_dir(
940 self.queue_entry.verify_results_dir())
mbligh36768f02008-02-22 18:28:33 +0000941 self.host.set_status('Verifying')
942
943
mblighd64e5702008-04-04 21:39:28 +0000944 def cleanup(self):
945 if not os.path.exists(self.temp_results_dir):
946 return
mbligh36768f02008-02-22 18:28:33 +0000947 if self.queue_entry and (self.success or
mblighd64e5702008-04-04 21:39:28 +0000948 not self.queue_entry.meta_host):
mbligh36768f02008-02-22 18:28:33 +0000949 self.move_results()
mblighd64e5702008-04-04 21:39:28 +0000950 super(VerifyTask, self).cleanup()
951
952
953 def epilog(self):
954 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +0000955
956 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000957 self.host.set_status('Ready')
958 elif self.queue_entry:
mblighdffd6372008-02-29 22:47:33 +0000959 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +0000960
961
962 def move_results(self):
963 assert self.queue_entry is not None
mblighe2586682008-02-29 22:45:46 +0000964 target_dir = self.queue_entry.verify_results_dir()
mbligh36768f02008-02-22 18:28:33 +0000965 if not os.path.exists(target_dir):
966 os.makedirs(target_dir)
967 files = os.listdir(self.temp_results_dir)
968 for filename in files:
mblighbb421852008-03-11 22:36:16 +0000969 if filename == AUTOSERV_PID_FILE:
970 continue
mblighe2586682008-02-29 22:45:46 +0000971 self.force_move(os.path.join(self.temp_results_dir,
972 filename),
973 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +0000974
975
mblighe2586682008-02-29 22:45:46 +0000976 @staticmethod
977 def force_move(source, dest):
978 """\
979 Replacement for shutil.move() that will delete the destination
980 if it exists, even if it's a directory.
981 """
982 if os.path.exists(dest):
983 print ('Warning: removing existing destination file ' +
984 dest)
985 remove_file_or_dir(dest)
986 shutil.move(source, dest)
987
988
mblighdffd6372008-02-29 22:47:33 +0000989class VerifySynchronousTask(VerifyTask):
990 def __init__(self, queue_entry):
mblighd64e5702008-04-04 21:39:28 +0000991 super(VerifySynchronousTask, self).__init__(
992 queue_entry = queue_entry)
mblighdffd6372008-02-29 22:47:33 +0000993
994
mbligh16c722d2008-03-05 00:58:44 +0000995 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +0000996 super(VerifySynchronousTask, self).epilog()
mbligh16c722d2008-03-05 00:58:44 +0000997 if self.success:
998 if self.queue_entry.job.num_complete() > 0:
999 # some other entry failed verify, and we've
1000 # already been marked as stopped
1001 return
mblighdffd6372008-02-29 22:47:33 +00001002
mbligh16c722d2008-03-05 00:58:44 +00001003 self.queue_entry.set_status('Pending')
1004 job = self.queue_entry.job
1005 if job.is_ready():
1006 agent = job.run(self.queue_entry)
1007 self.agent.dispatcher.add_agent(agent)
mblighe2586682008-02-29 22:45:46 +00001008
mbligh36768f02008-02-22 18:28:33 +00001009class QueueTask(AgentTask):
1010 def __init__(self, job, queue_entries, cmd):
mblighd64e5702008-04-04 21:39:28 +00001011 super(QueueTask, self).__init__(cmd)
mbligh36768f02008-02-22 18:28:33 +00001012 self.job = job
1013 self.queue_entries = queue_entries
1014
1015
mblighbb421852008-03-11 22:36:16 +00001016 @staticmethod
1017 def _write_keyval(results_dir, field, value):
1018 key_path = os.path.join(results_dir, 'keyval')
mbligh36768f02008-02-22 18:28:33 +00001019 keyval_file = open(key_path, 'a')
1020 print >> keyval_file, '%s=%d' % (field, value)
1021 keyval_file.close()
1022
1023
mblighbb421852008-03-11 22:36:16 +00001024 def results_dir(self):
1025 return self.queue_entries[0].results_dir()
1026
1027
1028 def run(self):
1029 """\
1030 Override AgentTask.run() so we can use a PidfileRunMonitor.
1031 """
1032 self.monitor = PidfileRunMonitor(self.results_dir(),
1033 cmd=self.cmd,
1034 nice_level=AUTOSERV_NICE_LEVEL)
1035
1036
mbligh36768f02008-02-22 18:28:33 +00001037 def prolog(self):
mblighdbdac6c2008-03-05 15:49:58 +00001038 # write the parser commands into the results directories
1039 if self.job.is_synchronous() or self.job.num_machines()==1:
1040 results_dir = self.job.results_dir()
1041 cmdfile = os.path.join(results_dir, '.parse.cmd')
mblighb03ba642008-03-13 17:37:17 +00001042 cmd = generate_parse_command(results_dir, '-n')
mblighdbdac6c2008-03-05 15:49:58 +00001043 print >> open(cmdfile, 'w'), cmd
1044 else:
1045 for queue_entry in self.queue_entries:
1046 results_dir = queue_entry.results_dir()
1047 cmdfile = os.path.join(results_dir,
1048 '.parse.cmd')
1049 cmd = generate_parse_command(results_dir,
mblighb03ba642008-03-13 17:37:17 +00001050 '-l 2 -n')
mblighdbdac6c2008-03-05 15:49:58 +00001051 print >> open(cmdfile, 'w'), cmd
1052
mblighe2586682008-02-29 22:45:46 +00001053 # write some job timestamps into the job keyval file
1054 queued = time.mktime(self.job.created_on.timetuple())
1055 started = time.time()
mblighbb421852008-03-11 22:36:16 +00001056 self._write_keyval(self.results_dir(), "job_queued", queued)
1057 self._write_keyval(self.results_dir(), "job_started", started)
mbligh36768f02008-02-22 18:28:33 +00001058 for queue_entry in self.queue_entries:
1059 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
1060 queue_entry.set_status('Running')
1061 queue_entry.host.set_status('Running')
mblighe2586682008-02-29 22:45:46 +00001062 if (not self.job.is_synchronous() and
1063 self.job.num_machines() > 1):
1064 assert len(self.queue_entries) == 1
1065 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001066
1067
1068 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001069 super(QueueTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001070 if self.success:
1071 status = 'Completed'
1072 else:
1073 status = 'Failed'
1074
mblighe2586682008-02-29 22:45:46 +00001075 # write another timestamp into the job keyval file
1076 finished = time.time()
mblighbb421852008-03-11 22:36:16 +00001077 self._write_keyval(self.results_dir(), "job_finished", finished)
mbligh36768f02008-02-22 18:28:33 +00001078 for queue_entry in self.queue_entries:
1079 queue_entry.set_status(status)
1080 queue_entry.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001081
1082 if self.job.is_synchronous() or self.job.num_machines()==1:
1083 if self.job.is_finished():
1084 parse_results(self.job.results_dir())
1085 else:
1086 for queue_entry in self.queue_entries:
mblighbb421852008-03-11 22:36:16 +00001087 parse_results(queue_entry.results_dir(),
1088 flags='-l 2')
1089
mbligh36768f02008-02-22 18:28:33 +00001090 print "queue_task finished with %s/%s" % (status, self.success)
1091
1092
mblighbb421852008-03-11 22:36:16 +00001093class RecoveryQueueTask(QueueTask):
1094 def __init__(self, job, queue_entries, run_monitor):
mblighd64e5702008-04-04 21:39:28 +00001095 super(RecoveryQueueTask, self).__init__(job,
1096 queue_entries, cmd=None)
mblighbb421852008-03-11 22:36:16 +00001097 self.run_monitor = run_monitor
1098
1099
1100 def run(self):
1101 self.monitor = self.run_monitor
1102
1103
1104 def prolog(self):
1105 # recovering an existing process - don't do prolog
1106 pass
1107
1108
mbligh36768f02008-02-22 18:28:33 +00001109class RebootTask(AgentTask):
mblighd5c95802008-03-05 00:33:46 +00001110 def __init__(self, host):
1111 global _autoserv_path
1112
1113 # Current implementation of autoserv requires control file
1114 # to be passed on reboot action request. TODO: remove when no
1115 # longer appropriate.
mblighd64e5702008-04-04 21:39:28 +00001116 self.create_temp_resultsdir('.reboot')
mblighd5c95802008-03-05 00:33:46 +00001117 self.cmd = [_autoserv_path, '-b', '-m', host.hostname,
mblighd64e5702008-04-04 21:39:28 +00001118 '-r', self.temp_results_dir, '/dev/null']
mbligh36768f02008-02-22 18:28:33 +00001119 self.host = host
mblighd64e5702008-04-04 21:39:28 +00001120 super(RebootTask, self).__init__(self.cmd,
mbligh16c722d2008-03-05 00:58:44 +00001121 failure_tasks=[RepairTask(host)])
1122
mblighd5c95802008-03-05 00:33:46 +00001123
1124 def prolog(self):
1125 print "starting reboot task for host: %s" % self.host.hostname
1126 self.host.set_status("Rebooting")
1127
mblighd5c95802008-03-05 00:33:46 +00001128
1129class AbortTask(AgentTask):
1130 def __init__(self, queue_entry, agents_to_abort):
1131 self.queue_entry = queue_entry
1132 self.agents_to_abort = agents_to_abort
1133 for agent in agents_to_abort:
1134 agent.dispatcher.remove_agent(agent)
mblighd64e5702008-04-04 21:39:28 +00001135 super(AbortTask, self).__init__('')
mbligh36768f02008-02-22 18:28:33 +00001136
1137
mblighd5c95802008-03-05 00:33:46 +00001138 def prolog(self):
1139 print "starting abort on host %s, job %s" % (
1140 self.queue_entry.host_id, self.queue_entry.job_id)
1141 self.queue_entry.set_status('Aborting')
1142
mbligh36768f02008-02-22 18:28:33 +00001143
mblighd5c95802008-03-05 00:33:46 +00001144 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001145 super(AbortTask, self).epilog()
mblighd5c95802008-03-05 00:33:46 +00001146 self.queue_entry.set_status('Aborted')
1147 self.success = True
mbligh36768f02008-02-22 18:28:33 +00001148
mblighd64e5702008-04-04 21:39:28 +00001149
mbligh36768f02008-02-22 18:28:33 +00001150 def run(self):
mblighd5c95802008-03-05 00:33:46 +00001151 for agent in self.agents_to_abort:
1152 if (agent.active_task):
1153 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001154
1155
1156class DBObject(object):
mblighe2586682008-02-29 22:45:46 +00001157 def __init__(self, fields, id=None, row=None, new_record=False):
1158 assert (bool(id) != bool(row)) and fields
mbligh36768f02008-02-22 18:28:33 +00001159
mblighe2586682008-02-29 22:45:46 +00001160 self.__table = self._get_table()
mbligh36768f02008-02-22 18:28:33 +00001161 self.__fields = fields
1162
1163 self.__new_record = new_record
1164
1165 if row is None:
1166 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
mbligh6f8bab42008-02-29 22:45:14 +00001167 rows = _db.execute(sql, (id,))
1168 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001169 raise "row not found (table=%s, id=%s)" % \
1170 (self.__table, id)
mbligh6f8bab42008-02-29 22:45:14 +00001171 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001172
mblighe2586682008-02-29 22:45:46 +00001173 assert len(row)==len(fields), (
1174 "table = %s, row = %s/%d, fields = %s/%d" % (
1175 self.__table, row, len(row), fields, len(fields)))
mbligh36768f02008-02-22 18:28:33 +00001176
1177 self.__valid_fields = {}
1178 for i,value in enumerate(row):
1179 self.__dict__[fields[i]] = value
1180 self.__valid_fields[fields[i]] = True
1181
1182 del self.__valid_fields['id']
1183
mblighe2586682008-02-29 22:45:46 +00001184
1185 @classmethod
1186 def _get_table(cls):
1187 raise NotImplementedError('Subclasses must override this')
1188
1189
mbligh36768f02008-02-22 18:28:33 +00001190 def count(self, where, table = None):
1191 if not table:
1192 table = self.__table
mbligh4314a712008-02-29 22:44:30 +00001193
mbligh6f8bab42008-02-29 22:45:14 +00001194 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001195 SELECT count(*) FROM %s
1196 WHERE %s
1197 """ % (table, where))
mbligh36768f02008-02-22 18:28:33 +00001198
mbligh6f8bab42008-02-29 22:45:14 +00001199 assert len(rows) == 1
1200
1201 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001202
1203
1204 def num_cols(self):
1205 return len(self.__fields)
1206
1207
1208 def update_field(self, field, value):
1209 assert self.__valid_fields[field]
1210
1211 if self.__dict__[field] == value:
1212 return
1213
1214 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
1215 (self.__table, field)
mbligh6f8bab42008-02-29 22:45:14 +00001216 _db.execute(query, (value, self.id))
mbligh36768f02008-02-22 18:28:33 +00001217
1218 self.__dict__[field] = value
1219
1220
1221 def save(self):
1222 if self.__new_record:
1223 keys = self.__fields[1:] # avoid id
1224 columns = ','.join([str(key) for key in keys])
1225 values = ['"%s"' % self.__dict__[key] for key in keys]
1226 values = ','.join(values)
1227 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1228 (self.__table, columns, values)
mbligh6f8bab42008-02-29 22:45:14 +00001229 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001230
1231
mblighe2586682008-02-29 22:45:46 +00001232 def delete(self):
1233 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1234 _db.execute(query, (self.id,))
1235
1236
1237 @classmethod
1238 def fetch(cls, where):
1239 rows = _db.execute(
1240 'SELECT * FROM %s WHERE %s' % (cls._get_table(), where))
1241 for row in rows:
1242 yield cls(row=row)
1243
mbligh36768f02008-02-22 18:28:33 +00001244
1245class IneligibleHostQueue(DBObject):
1246 def __init__(self, id=None, row=None, new_record=None):
1247 fields = ['id', 'job_id', 'host_id']
mblighd64e5702008-04-04 21:39:28 +00001248 super(IneligibleHostQueue, self).__init__(fields, id=id,
1249 row=row, new_record=new_record)
mblighe2586682008-02-29 22:45:46 +00001250
1251
1252 @classmethod
1253 def _get_table(cls):
1254 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001255
1256
1257class Host(DBObject):
1258 def __init__(self, id=None, row=None):
1259 fields = ['id', 'hostname', 'locked', 'synch_id','status']
mblighd64e5702008-04-04 21:39:28 +00001260 super(Host, self).__init__(fields, id=id, row=row)
mblighe2586682008-02-29 22:45:46 +00001261
1262
1263 @classmethod
1264 def _get_table(cls):
1265 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001266
1267
1268 def current_task(self):
mbligh6f8bab42008-02-29 22:45:14 +00001269 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001270 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1271 """, (self.id,))
1272
mbligh6f8bab42008-02-29 22:45:14 +00001273 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001274 return None
1275 else:
mbligh6f8bab42008-02-29 22:45:14 +00001276 assert len(rows) == 1
1277 results = rows[0];
mbligh36768f02008-02-22 18:28:33 +00001278# print "current = %s" % results
1279 return HostQueueEntry(row=results)
1280
1281
1282 def next_queue_entries(self):
1283 if self.locked:
1284 print "%s locked, not queuing" % self.hostname
1285 return None
1286# print "%s/%s looking for work" % (self.hostname, self.platform_id)
mbligh6f8bab42008-02-29 22:45:14 +00001287 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001288 SELECT * FROM host_queue_entries
1289 WHERE ((host_id=%s) OR (meta_host IS NOT null AND
1290 (meta_host IN (
1291 SELECT label_id FROM hosts_labels WHERE host_id=%s
1292 )
1293 )
1294 AND job_id NOT IN (
1295 SELECT job_id FROM ineligible_host_queues
1296 WHERE host_id=%s
1297 )))
1298 AND NOT complete AND NOT active
1299 ORDER BY priority DESC, meta_host, id
1300 LIMIT 1
1301 """, (self.id,self.id, self.id))
1302
mbligh6f8bab42008-02-29 22:45:14 +00001303 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001304 return None
1305 else:
mbligh6f8bab42008-02-29 22:45:14 +00001306 return [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001307
1308 def yield_work(self):
1309 print "%s yielding work" % self.hostname
1310 if self.current_task():
1311 self.current_task().requeue()
1312
1313 def set_status(self,status):
mblighbb421852008-03-11 22:36:16 +00001314 print '%s -> %s' % (self.hostname, status)
mbligh36768f02008-02-22 18:28:33 +00001315 self.update_field('status',status)
1316
1317
1318class HostQueueEntry(DBObject):
1319 def __init__(self, id=None, row=None):
1320 assert id or row
1321 fields = ['id', 'job_id', 'host_id', 'priority', 'status',
1322 'meta_host', 'active', 'complete']
mblighd64e5702008-04-04 21:39:28 +00001323 super(HostQueueEntry, self).__init__(fields, id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001324 self.job = Job(self.job_id)
1325
1326 if self.host_id:
1327 self.host = Host(self.host_id)
1328 else:
1329 self.host = None
1330
1331 self.queue_log_path = os.path.join(self.job.results_dir(),
1332 'queue.log.' + str(self.id))
1333
1334
mblighe2586682008-02-29 22:45:46 +00001335 @classmethod
1336 def _get_table(cls):
1337 return 'host_queue_entries'
1338
1339
mbligh36768f02008-02-22 18:28:33 +00001340 def set_host(self, host):
1341 if host:
1342 self.queue_log_record('Assigning host ' + host.hostname)
1343 self.update_field('host_id', host.id)
1344 self.update_field('active', True)
mblighe2586682008-02-29 22:45:46 +00001345 self.block_host(host.id)
mbligh36768f02008-02-22 18:28:33 +00001346 else:
1347 self.queue_log_record('Releasing host')
mblighe2586682008-02-29 22:45:46 +00001348 self.unblock_host(self.host.id)
mbligh36768f02008-02-22 18:28:33 +00001349 self.update_field('host_id', None)
1350
1351 self.host = host
1352
1353
1354 def get_host(self):
mblighe2586682008-02-29 22:45:46 +00001355 return self.host
mbligh36768f02008-02-22 18:28:33 +00001356
1357
1358 def queue_log_record(self, log_line):
mblighe2586682008-02-29 22:45:46 +00001359 now = str(datetime.datetime.now())
mbligh36768f02008-02-22 18:28:33 +00001360 queue_log = open(self.queue_log_path, 'a', 0)
mblighe2586682008-02-29 22:45:46 +00001361 queue_log.write(now + ' ' + log_line + '\n')
mbligh36768f02008-02-22 18:28:33 +00001362 queue_log.close()
1363
1364
mblighe2586682008-02-29 22:45:46 +00001365 def block_host(self, host_id):
1366 print "creating block %s/%s" % (self.job.id, host_id)
1367 row = [0, self.job.id, host_id]
1368 block = IneligibleHostQueue(row=row, new_record=True)
1369 block.save()
1370
1371
1372 def unblock_host(self, host_id):
1373 print "removing block %s/%s" % (self.job.id, host_id)
1374 blocks = list(IneligibleHostQueue.fetch(
1375 'job_id=%d and host_id=%d' % (self.job.id, host_id)))
1376 assert len(blocks) == 1
1377 blocks[0].delete()
1378
1379
mbligh36768f02008-02-22 18:28:33 +00001380 def results_dir(self):
mblighe2586682008-02-29 22:45:46 +00001381 if self.job.is_synchronous() or self.job.num_machines() == 1:
1382 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001383 else:
1384 assert self.host
mblighe2586682008-02-29 22:45:46 +00001385 return os.path.join(self.job.job_dir,
1386 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +00001387
mblighe2586682008-02-29 22:45:46 +00001388
1389 def verify_results_dir(self):
1390 if self.job.is_synchronous() or self.job.num_machines() > 1:
1391 assert self.host
1392 return os.path.join(self.job.job_dir,
1393 self.host.hostname)
1394 else:
1395 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001396
1397
1398 def set_status(self, status):
1399 self.update_field('status', status)
1400 if self.host:
1401 hostname = self.host.hostname
1402 else:
1403 hostname = 'no host'
1404 print "%s/%d status -> %s" % (hostname, self.id, self.status)
1405 if status in ['Queued']:
1406 self.update_field('complete', False)
1407 self.update_field('active', False)
1408
mblighd5c95802008-03-05 00:33:46 +00001409 if status in ['Pending', 'Running', 'Verifying', 'Starting',
1410 'Abort', 'Aborting']:
mbligh36768f02008-02-22 18:28:33 +00001411 self.update_field('complete', False)
1412 self.update_field('active', True)
1413
mblighd5c95802008-03-05 00:33:46 +00001414 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
mbligh36768f02008-02-22 18:28:33 +00001415 self.update_field('complete', True)
1416 self.update_field('active', False)
1417
1418
1419 def run(self,assigned_host=None):
1420 if self.meta_host:
1421 assert assigned_host
mblighe2586682008-02-29 22:45:46 +00001422 # ensure results dir exists for the queue log
mbligh36768f02008-02-22 18:28:33 +00001423 self.job.create_results_dir()
1424 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001425
mbligh36768f02008-02-22 18:28:33 +00001426 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1427 self.meta_host, self.host.hostname, self.status)
1428
1429 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001430
mbligh36768f02008-02-22 18:28:33 +00001431 def requeue(self):
1432 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001433
mbligh36768f02008-02-22 18:28:33 +00001434 if self.meta_host:
1435 self.set_host(None)
1436
1437
mblighe2586682008-02-29 22:45:46 +00001438 def handle_host_failure(self):
1439 """\
1440 Called when this queue entry's host has failed verification and
1441 repair.
1442 """
mblighdffd6372008-02-29 22:47:33 +00001443 assert not self.meta_host
1444 self.set_status('Failed')
1445 if self.job.is_synchronous():
1446 self.job.stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00001447
1448
mbligh90a549d2008-03-25 23:52:34 +00001449 def clear_results_dir(self, results_dir=None, dont_delete_files=False):
mblighe2586682008-02-29 22:45:46 +00001450 results_dir = results_dir or self.results_dir()
1451 if not os.path.exists(results_dir):
1452 return
mbligh90a549d2008-03-25 23:52:34 +00001453 if dont_delete_files:
1454 temp_dir = tempfile.mkdtemp(suffix='.clear_results')
1455 print 'Moving results from %s to %s' % (results_dir,
1456 temp_dir)
mblighe2586682008-02-29 22:45:46 +00001457 for filename in os.listdir(results_dir):
mblighe2586682008-02-29 22:45:46 +00001458 path = os.path.join(results_dir, filename)
mbligh90a549d2008-03-25 23:52:34 +00001459 if dont_delete_files:
1460 shutil.move(path,
1461 os.path.join(temp_dir, filename))
1462 else:
1463 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00001464
1465
1466class Job(DBObject):
1467 def __init__(self, id=None, row=None):
1468 assert id or row
mblighd64e5702008-04-04 21:39:28 +00001469 super(Job, self).__init__(
mblighe2586682008-02-29 22:45:46 +00001470 ['id','owner','name','priority',
1471 'control_file','control_type','created_on',
1472 'synch_type', 'synch_count','synchronizing'],
1473 id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001474
mblighe2586682008-02-29 22:45:46 +00001475 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
1476 self.owner))
1477
1478
1479 @classmethod
1480 def _get_table(cls):
1481 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001482
1483
1484 def is_server_job(self):
1485 return self.control_type != 2
1486
1487
1488 def get_host_queue_entries(self):
mbligh6f8bab42008-02-29 22:45:14 +00001489 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001490 SELECT * FROM host_queue_entries
1491 WHERE job_id= %s
1492 """, (self.id,))
mbligh6f8bab42008-02-29 22:45:14 +00001493 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001494
1495 assert len(entries)>0
1496
1497 return entries
1498
1499
1500 def set_status(self, status, update_queues=False):
1501 self.update_field('status',status)
1502
1503 if update_queues:
1504 for queue_entry in self.get_host_queue_entries():
1505 queue_entry.set_status(status)
1506
1507
1508 def is_synchronous(self):
1509 return self.synch_type == 2
1510
1511
1512 def is_ready(self):
1513 if not self.is_synchronous():
1514 return True
1515 sql = "job_id=%s AND status='Pending'" % self.id
1516 count = self.count(sql, table='host_queue_entries')
1517 return (count == self.synch_count)
1518
1519
1520 def ready_to_synchronize(self):
1521 # heuristic
1522 queue_entries = self.get_host_queue_entries()
1523 count = 0
1524 for queue_entry in queue_entries:
1525 if queue_entry.status == 'Pending':
1526 count += 1
1527
1528 return (count/self.synch_count >= 0.5)
1529
1530
1531 def start_synchronizing(self):
1532 self.update_field('synchronizing', True)
1533
1534
1535 def results_dir(self):
1536 return self.job_dir
1537
1538 def num_machines(self, clause = None):
1539 sql = "job_id=%s" % self.id
1540 if clause:
1541 sql += " AND (%s)" % clause
1542 return self.count(sql, table='host_queue_entries')
1543
1544
1545 def num_queued(self):
1546 return self.num_machines('not complete')
1547
1548
1549 def num_active(self):
1550 return self.num_machines('active')
1551
1552
1553 def num_complete(self):
1554 return self.num_machines('complete')
1555
1556
1557 def is_finished(self):
1558 left = self.num_queued()
1559 print "%s: %s machines left" % (self.name, left)
1560 return left==0
1561
1562 def stop_synchronizing(self):
1563 self.update_field('synchronizing', False)
1564 self.set_status('Queued', update_queues = False)
1565
1566
mblighe2586682008-02-29 22:45:46 +00001567 def stop_all_entries(self):
1568 for child_entry in self.get_host_queue_entries():
1569 if not child_entry.complete:
1570 child_entry.set_status('Stopped')
1571
1572
1573 def write_to_machines_file(self, queue_entry):
1574 hostname = queue_entry.get_host().hostname
1575 print "writing %s to job %s machines file" % (hostname, self.id)
1576 file_path = os.path.join(self.job_dir, '.machines')
1577 mf = open(file_path, 'a')
1578 mf.write("%s\n" % queue_entry.get_host().hostname)
1579 mf.close()
mbligh36768f02008-02-22 18:28:33 +00001580
1581
1582 def create_results_dir(self, queue_entry=None):
1583 print "create: active: %s complete %s" % (self.num_active(),
1584 self.num_complete())
1585
1586 if not os.path.exists(self.job_dir):
1587 os.makedirs(self.job_dir)
1588
1589 if queue_entry:
1590 return queue_entry.results_dir()
1591 return self.job_dir
1592
1593
1594 def run(self, queue_entry):
1595 results_dir = self.create_results_dir(queue_entry)
1596
1597 if self.is_synchronous():
1598 if not self.is_ready():
mblighd5c95802008-03-05 00:33:46 +00001599 return Agent([VerifySynchronousTask(
1600 queue_entry = queue_entry)],
1601 [queue_entry.id])
mbligh36768f02008-02-22 18:28:33 +00001602
1603 queue_entry.set_status('Starting')
1604
1605 ctrl = open(os.tmpnam(), 'w')
1606 if self.control_file:
1607 ctrl.write(self.control_file)
1608 else:
1609 ctrl.write("")
1610 ctrl.flush()
1611
1612 if self.is_synchronous():
mbligh36768f02008-02-22 18:28:33 +00001613 queue_entries = self.get_host_queue_entries()
1614 else:
1615 assert queue_entry
mbligh36768f02008-02-22 18:28:33 +00001616 queue_entries = [queue_entry]
mblighe2586682008-02-29 22:45:46 +00001617 hostnames = ','.join([entry.get_host().hostname
1618 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00001619
mblighbb421852008-03-11 22:36:16 +00001620 params = [_autoserv_path, '-p', '-n',
1621 '-r', os.path.abspath(results_dir),
1622 '-b', '-u', self.owner, '-l', self.name,
1623 '-m', hostnames, ctrl.name]
mbligh36768f02008-02-22 18:28:33 +00001624
1625 if not self.is_server_job():
1626 params.append('-c')
1627
1628 tasks = []
1629 if not self.is_synchronous():
1630 tasks.append(VerifyTask(queue_entry))
mblighe2586682008-02-29 22:45:46 +00001631
1632 tasks.append(QueueTask(job = self,
1633 queue_entries = queue_entries,
1634 cmd = params))
mbligh36768f02008-02-22 18:28:33 +00001635
mblighd5c95802008-03-05 00:33:46 +00001636 ids = []
1637 for entry in queue_entries:
1638 ids.append(entry.id)
1639
1640 agent = Agent(tasks, ids)
mbligh36768f02008-02-22 18:28:33 +00001641
1642 return agent
1643
1644
1645if __name__ == '__main__':
1646 main()