blob: 975eb4700457af615a12f5febfcb6b79e7c96c5c [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
mblighcadb3532008-04-15 17:46:26 +00009import optparse, signal, smtplib, socket, datetime, stat, pwd, errno
mblighb090f142008-02-27 21:33:46 +000010from common import global_config
11
mbligh36768f02008-02-22 18:28:33 +000012RESULTS_DIR = '.'
13AUTOSERV_NICE_LEVEL = 10
14
15AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
16
17if os.environ.has_key('AUTOTEST_DIR'):
18 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
19AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
20AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
21
22if AUTOTEST_SERVER_DIR not in sys.path:
23 sys.path.insert(0, AUTOTEST_SERVER_DIR)
24
mblighbb421852008-03-11 22:36:16 +000025AUTOSERV_PID_FILE = '.autoserv_execute'
mbligh90a549d2008-03-25 23:52:34 +000026# how long to wait for autoserv to write a pidfile
27PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000028
mbligh6f8bab42008-02-29 22:45:14 +000029_db = None
mbligh36768f02008-02-22 18:28:33 +000030_shutdown = False
31_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000032_autoserv_path = 'autoserv'
33_testing_mode = False
mbligh36768f02008-02-22 18:28:33 +000034
35
36def main():
37 usage = 'usage: %prog [options] results_dir'
38
39 parser = optparse.OptionParser(usage)
mblighbb421852008-03-11 22:36:16 +000040 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
mbligh36768f02008-02-22 18:28:33 +000041 action='store_true')
42 parser.add_option('--logfile', help='Set a log file that all stdout ' +
43 'should be redirected to. Stderr will go to this ' +
44 'file + ".err"')
45 parser.add_option('--notify', help='Set an email address to be ' +
46 'notified of exceptions')
mbligh4314a712008-02-29 22:44:30 +000047 parser.add_option('--test', help='Indicate that scheduler is under ' +
48 'test and should use dummy autoserv and no parsing',
49 action='store_true')
mbligh36768f02008-02-22 18:28:33 +000050 (options, args) = parser.parse_args()
51 if len(args) != 1:
52 parser.print_usage()
53 return
54
55 global RESULTS_DIR
56 RESULTS_DIR = args[0]
57
58 global _notify_email
59 _notify_email = options.notify
mbligh4314a712008-02-29 22:44:30 +000060
61 if options.test:
62 global _autoserv_path
63 _autoserv_path = 'autoserv_dummy'
64 global _testing_mode
65 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000066
67 init(options.logfile)
mblighbb421852008-03-11 22:36:16 +000068 dispatcher = Dispatcher()
69 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
mbligh36768f02008-02-22 18:28:33 +000070
71 try:
72 while not _shutdown:
73 dispatcher.tick()
74 time.sleep(20)
mbligh36768f02008-02-22 18:28:33 +000075 except:
76 log_stacktrace("Uncaught exception; terminating monitor_db")
77
mbligh6f8bab42008-02-29 22:45:14 +000078 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +000079
80
81def handle_sigint(signum, frame):
82 global _shutdown
83 _shutdown = True
84 print "Shutdown request received."
85
86
87def init(logfile):
88 if logfile:
89 enable_logging(logfile)
90 print "%s> dispatcher starting" % time.strftime("%X %x")
91 print "My PID is %d" % os.getpid()
92
93 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
mbligh6f8bab42008-02-29 22:45:14 +000094 global _db
95 _db = DatabaseConn()
mbligh36768f02008-02-22 18:28:33 +000096
97 print "Setting signal handler"
98 signal.signal(signal.SIGINT, handle_sigint)
99
100 print "Connected! Running..."
101
102
103def enable_logging(logfile):
104 out_file = logfile
105 err_file = "%s.err" % logfile
106 print "Enabling logging to %s (%s)" % (out_file, err_file)
107 out_fd = open(out_file, "a", buffering=0)
108 err_fd = open(err_file, "a", buffering=0)
109
110 os.dup2(out_fd.fileno(), sys.stdout.fileno())
111 os.dup2(err_fd.fileno(), sys.stderr.fileno())
112
113 sys.stdout = out_fd
114 sys.stderr = err_fd
115
116
117def idle_hosts():
mbligh6f8bab42008-02-29 22:45:14 +0000118 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000119 SELECT * FROM hosts h WHERE
120 id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
121 (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
122 OR
123 (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
124 INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
125 )
126 AND locked=false AND (h.status IS null OR h.status='Ready') """)
mbligh6f8bab42008-02-29 22:45:14 +0000127 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000128 return hosts
129
mblighd5c95802008-03-05 00:33:46 +0000130def queue_entries_to_abort():
131 rows = _db.execute("""
132 SELECT * FROM host_queue_entries WHERE status='Abort';
133 """)
134 qe = [HostQueueEntry(row=i) for i in rows]
135 return qe
mbligh36768f02008-02-22 18:28:33 +0000136
mblighe2586682008-02-29 22:45:46 +0000137def remove_file_or_dir(path):
138 if stat.S_ISDIR(os.stat(path).st_mode):
139 # directory
140 shutil.rmtree(path)
141 else:
142 # file
143 os.remove(path)
144
145
mbligh6f8bab42008-02-29 22:45:14 +0000146class DatabaseConn:
147 def __init__(self):
148 self.reconnect_wait = 20
149 self.conn = None
150 self.cur = None
mbligh36768f02008-02-22 18:28:33 +0000151
mbligh4eb2df22008-03-13 15:39:29 +0000152 import MySQLdb.converters
153 self.convert_dict = MySQLdb.converters.conversions
154 self.convert_dict.setdefault(bool, self.convert_boolean)
155
mbligh6f8bab42008-02-29 22:45:14 +0000156 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000157
158
mbligh4eb2df22008-03-13 15:39:29 +0000159 @staticmethod
160 def convert_boolean(boolean, conversion_dict):
161 'Convert booleans to integer strings'
162 return str(int(boolean))
163
164
mbligh6f8bab42008-02-29 22:45:14 +0000165 def connect(self):
166 self.disconnect()
167
168 # get global config and parse for info
169 c = global_config.global_config
170 dbase = "AUTOTEST_WEB"
mbligh104e9ce2008-03-11 22:01:44 +0000171 DB_HOST = c.get_config_value(dbase, "host")
172 DB_SCHEMA = c.get_config_value(dbase, "database")
mbligh6f8bab42008-02-29 22:45:14 +0000173
174 global _testing_mode
175 if _testing_mode:
176 DB_SCHEMA = 'stresstest_autotest_web'
177
mbligh104e9ce2008-03-11 22:01:44 +0000178 DB_USER = c.get_config_value(dbase, "user")
179 DB_PASS = c.get_config_value(dbase, "password")
mbligh6f8bab42008-02-29 22:45:14 +0000180
181 while not self.conn:
182 try:
mbligh4eb2df22008-03-13 15:39:29 +0000183 self.conn = MySQLdb.connect(
184 host=DB_HOST, user=DB_USER, passwd=DB_PASS,
185 db=DB_SCHEMA, conv=self.convert_dict)
mbligh6f8bab42008-02-29 22:45:14 +0000186
187 self.conn.autocommit(True)
188 self.cur = self.conn.cursor()
189 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000190 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000191 print "Can't connect to MYSQL; reconnecting"
192 time.sleep(self.reconnect_wait)
193 self.disconnect()
194
195
196 def disconnect(self):
197 if self.conn:
198 self.conn.close()
199 self.conn = None
200 self.cur = None
201
202
203 def execute(self, *args, **dargs):
204 while (True):
205 try:
206 self.cur.execute(*args, **dargs)
207 return self.cur.fetchall()
208 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000209 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000210 print "MYSQL connection died; reconnecting"
211 time.sleep(self.reconnect_wait)
212 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000213
214
mblighdbdac6c2008-03-05 15:49:58 +0000215def generate_parse_command(results_dir, flags=""):
216 parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
217 output = os.path.abspath(os.path.join(results_dir, '.parse.log'))
218 cmd = "%s %s -r -o %s > %s 2>&1 &"
219 return cmd % (parse, flags, results_dir, output)
220
221
mbligh36768f02008-02-22 18:28:33 +0000222def parse_results(results_dir, flags=""):
mbligh4314a712008-02-29 22:44:30 +0000223 if _testing_mode:
224 return
mblighdbdac6c2008-03-05 15:49:58 +0000225 os.system(generate_parse_command(results_dir, flags))
mbligh36768f02008-02-22 18:28:33 +0000226
227
mblighbb421852008-03-11 22:36:16 +0000228def send_notify_email(subject, message):
229 if not _notify_email:
230 return
231
232 message = "%s / %s / %s\n%s" % (socket.gethostname(), os.getpid(),
233 time.strftime("%X %x"), message)
234 sender = pwd.getpwuid(os.getuid())[0] # see os.getlogin() online docs
235 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
236 sender, _notify_email, subject, message)
237 mailer = smtplib.SMTP('localhost')
238 mailer.sendmail(sender, _notify_email, msg)
239 mailer.quit()
240
241
mbligh36768f02008-02-22 18:28:33 +0000242def log_stacktrace(reason):
243 (type, value, tb) = sys.exc_info()
244 str = "EXCEPTION: %s\n" % reason
mbligh36768f02008-02-22 18:28:33 +0000245 str += ''.join(traceback.format_exception(type, value, tb))
246
247 sys.stderr.write("\n%s\n" % str)
mblighbb421852008-03-11 22:36:16 +0000248 send_notify_email("monitor_db exception", str)
mbligh36768f02008-02-22 18:28:33 +0000249
mblighbb421852008-03-11 22:36:16 +0000250
251def get_proc_poll_fn(pid):
252 proc_path = os.path.join('/proc', str(pid))
253 def poll_fn():
254 if os.path.exists(proc_path):
255 return None
256 return 0 # we can't get a real exit code
257 return poll_fn
258
259
260def kill_autoserv(pid, poll_fn=None):
261 print 'killing', pid
262 if poll_fn is None:
263 poll_fn = get_proc_poll_fn(pid)
264 if poll_fn() == None:
265 os.kill(pid, signal.SIGCONT)
266 os.kill(pid, signal.SIGTERM)
mbligh36768f02008-02-22 18:28:33 +0000267
268
269class Dispatcher:
mbligh90a549d2008-03-25 23:52:34 +0000270 autoserv_procs_cache = None
271
mblighbb421852008-03-11 22:36:16 +0000272 def __init__(self):
mbligh36768f02008-02-22 18:28:33 +0000273 self._agents = []
mbligh36768f02008-02-22 18:28:33 +0000274
mbligh36768f02008-02-22 18:28:33 +0000275
mblighbb421852008-03-11 22:36:16 +0000276 def do_initial_recovery(self, recover_hosts=True):
277 # always recover processes
278 self._recover_processes()
279
280 if recover_hosts:
281 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000282
283
284 def tick(self):
mbligh90a549d2008-03-25 23:52:34 +0000285 Dispatcher.autoserv_procs_cache = None
mblighbb421852008-03-11 22:36:16 +0000286 self._find_aborting()
287 self._find_more_work()
mbligh36768f02008-02-22 18:28:33 +0000288 self._handle_agents()
289
290
291 def add_agent(self, agent):
292 self._agents.append(agent)
293 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000294
295 # Find agent corresponding to the specified queue_entry
296 def get_agents(self, queue_entry):
297 res_agents = []
298 for agent in self._agents:
299 if queue_entry.id in agent.queue_entry_ids:
300 res_agents.append(agent)
301 return res_agents
302
303
304 def remove_agent(self, agent):
305 self._agents.remove(agent)
mbligh36768f02008-02-22 18:28:33 +0000306
307
mbligh90a549d2008-03-25 23:52:34 +0000308 @classmethod
309 def find_autoservs(cls, orphans_only=False):
mblighbb421852008-03-11 22:36:16 +0000310 """\
311 Returns a dict mapping pids to command lines for root autoserv
mbligh90a549d2008-03-25 23:52:34 +0000312 processes. If orphans_only=True, return only processes that
313 have been orphaned (i.e. parent pid = 1).
mblighbb421852008-03-11 22:36:16 +0000314 """
mbligh90a549d2008-03-25 23:52:34 +0000315 if cls.autoserv_procs_cache is not None:
316 return cls.autoserv_procs_cache
317
mblighbb421852008-03-11 22:36:16 +0000318 proc = subprocess.Popen(
mbligh90a549d2008-03-25 23:52:34 +0000319 ['/bin/ps', 'x', '-o', 'pid,pgid,ppid,comm,args'],
mblighbb421852008-03-11 22:36:16 +0000320 stdout=subprocess.PIPE)
321 # split each line into the four columns output by ps
mbligh90a549d2008-03-25 23:52:34 +0000322 procs = [line.split(None, 4) for line in
mblighbb421852008-03-11 22:36:16 +0000323 proc.communicate()[0].splitlines()]
mbligh90a549d2008-03-25 23:52:34 +0000324 autoserv_procs = {}
325 for proc in procs:
326 # check ppid == 1 for orphans
327 if orphans_only and proc[2] != 1:
328 continue
329 # only root autoserv processes have pgid == pid
330 if (proc[3] == 'autoserv' and # comm
331 proc[1] == proc[0]): # pgid == pid
332 # map pid to args
333 autoserv_procs[int(proc[0])] = proc[4]
334 cls.autoserv_procs_cache = autoserv_procs
335 return autoserv_procs
mblighbb421852008-03-11 22:36:16 +0000336
337
338 def recover_queue_entry(self, queue_entry, run_monitor):
339 job = queue_entry.job
340 if job.is_synchronous():
341 all_queue_entries = job.get_host_queue_entries()
342 else:
343 all_queue_entries = [queue_entry]
344 all_queue_entry_ids = [queue_entry.id for queue_entry
345 in all_queue_entries]
346 queue_task = RecoveryQueueTask(
347 job=queue_entry.job,
348 queue_entries=all_queue_entries,
349 run_monitor=run_monitor)
350 self.add_agent(Agent(tasks=[queue_task],
351 queue_entry_ids=all_queue_entry_ids))
352
353
354 def _recover_processes(self):
mbligh90a549d2008-03-25 23:52:34 +0000355 orphans = self.find_autoservs(orphans_only=True)
mblighbb421852008-03-11 22:36:16 +0000356
357 # first, recover running queue entries
358 rows = _db.execute("""SELECT * FROM host_queue_entries
359 WHERE status = 'Running'""")
360 queue_entries = [HostQueueEntry(row=i) for i in rows]
361 requeue_entries = []
362 recovered_entry_ids = set()
363 for queue_entry in queue_entries:
364 run_monitor = PidfileRunMonitor(
365 queue_entry.results_dir())
366 pid, exit_code = run_monitor.get_pidfile_info()
367 if pid is None:
368 # autoserv apparently never got run, so requeue
369 requeue_entries.append(queue_entry)
370 continue
371 if queue_entry.id in recovered_entry_ids:
372 # synchronous job we've already recovered
373 continue
374 print 'Recovering queue entry %d (pid %d)' % (
375 queue_entry.id, pid)
376 job = queue_entry.job
377 if job.is_synchronous():
378 for entry in job.get_host_queue_entries():
379 assert entry.active
380 recovered_entry_ids.add(entry.id)
381 self.recover_queue_entry(queue_entry,
382 run_monitor)
383 orphans.pop(pid, None)
384
385 # and requeue other active queue entries
386 rows = _db.execute("""SELECT * FROM host_queue_entries
387 WHERE active AND NOT complete
388 AND status != 'Running'
389 AND status != 'Pending'
390 AND status != 'Abort'
391 AND status != 'Aborting'""")
392 queue_entries = [HostQueueEntry(row=i) for i in rows]
393 for queue_entry in queue_entries + requeue_entries:
394 print 'Requeuing running QE %d' % queue_entry.id
mbligh90a549d2008-03-25 23:52:34 +0000395 queue_entry.clear_results_dir(dont_delete_files=True)
mblighbb421852008-03-11 22:36:16 +0000396 queue_entry.requeue()
397
398
399 # now kill any remaining autoserv processes
400 for pid in orphans.keys():
401 print 'Killing orphan %d (%s)' % (pid, orphans[pid])
402 kill_autoserv(pid)
403
404 # recover aborting tasks
mbligh90a549d2008-03-25 23:52:34 +0000405 rebooting_host_ids = set()
mblighd5c95802008-03-05 00:33:46 +0000406 rows = _db.execute("""SELECT * FROM host_queue_entries
407 WHERE status='Abort' or status='Aborting'""")
mblighbb421852008-03-11 22:36:16 +0000408 queue_entries = [HostQueueEntry(row=i) for i in rows]
409 for queue_entry in queue_entries:
mbligh90a549d2008-03-25 23:52:34 +0000410 print 'Recovering aborting QE %d' % queue_entry.id
mblighbb421852008-03-11 22:36:16 +0000411 queue_host = queue_entry.get_host()
412 reboot_task = RebootTask(queue_host)
413 verify_task = VerifyTask(host = queue_host)
414 self.add_agent(Agent(tasks=[reboot_task,
415 verify_task],
416 queue_entry_ids=[queue_entry.id]))
417 queue_entry.set_status('Aborted')
418 # Secure the host from being picked up
419 queue_host.set_status('Rebooting')
mbligh90a549d2008-03-25 23:52:34 +0000420 rebooting_host_ids.add(queue_host.id)
mblighd5c95802008-03-05 00:33:46 +0000421
mblighbb421852008-03-11 22:36:16 +0000422 # reverify hosts that were in the middle of verify, repair or
423 # reboot
mbligh90a549d2008-03-25 23:52:34 +0000424 self._reverify_hosts_where("""(status = 'Repairing' OR
425 status = 'Verifying' OR
426 status = 'Rebooting')""",
427 exclude_ids=rebooting_host_ids)
428
429 # finally, recover "Running" hosts with no active queue entries,
430 # although this should never happen
431 message = ('Recovering running host %s - this probably '
432 'indicates a scheduler bug')
433 self._reverify_hosts_where("""status = 'Running' AND
434 id NOT IN (SELECT host_id
435 FROM host_queue_entries
436 WHERE active)""",
437 print_message=message)
438
439
440 def _reverify_hosts_where(self, where,
441 print_message='Reverifying host %s',
442 exclude_ids=set()):
443 rows = _db.execute('SELECT * FROM hosts WHERE locked = 0 AND ' +
444 where)
mblighbb421852008-03-11 22:36:16 +0000445 hosts = [Host(row=i) for i in rows]
446 for host in hosts:
mbligh90a549d2008-03-25 23:52:34 +0000447 if host.id in exclude_ids:
448 continue
449 if print_message is not None:
450 print print_message % host.hostname
451 verify_task = VerifyTask(host = host)
452 self.add_agent(Agent(tasks = [verify_task]))
mblighbb421852008-03-11 22:36:16 +0000453
454
455 def _recover_hosts(self):
mbligh90a549d2008-03-25 23:52:34 +0000456 # recover "Repair Failed" hosts
457 message = 'Reverifying dead host %s'
458 self._reverify_hosts_where("status = 'Repair Failed'",
459 print_message=message)
mbligh36768f02008-02-22 18:28:33 +0000460
461
462 def _find_more_work(self):
463 print "finding work"
464
465 num_started = 0
466 for host in idle_hosts():
467 tasks = host.next_queue_entries()
468 if tasks:
469 for next in tasks:
470 try:
471 agent = next.run(assigned_host=host)
472 if agent:
473 self.add_agent(agent)
474
475 num_started += 1
476 if num_started>=100:
477 return
478 break
479 except:
480 next.set_status('Failed')
481
482# if next.host:
483# next.host.set_status('Ready')
484
485 log_stacktrace("task_id = %d" % next.id)
486
487
mblighd5c95802008-03-05 00:33:46 +0000488 def _find_aborting(self):
489 num_aborted = 0
490 # Find jobs that are aborting
491 for entry in queue_entries_to_abort():
492 agents_to_abort = self.get_agents(entry)
493 entry_host = entry.get_host()
494 reboot_task = RebootTask(entry_host)
495 verify_task = VerifyTask(host = entry_host)
496 tasks = [reboot_task, verify_task]
497 if agents_to_abort:
498 abort_task = AbortTask(entry, agents_to_abort)
499 tasks.insert(0, abort_task)
500 else:
501 entry.set_status('Aborted')
502 # just to make sure this host does not get
503 # taken away
504 entry_host.set_status('Rebooting')
505 self.add_agent(Agent(tasks=tasks,
506 queue_entry_ids = [entry.id]))
507 num_aborted += 1
508 if num_aborted >= 50:
509 break
510
511
mbligh36768f02008-02-22 18:28:33 +0000512 def _handle_agents(self):
513 still_running = []
514 for agent in self._agents:
515 agent.tick()
516 if not agent.is_done():
517 still_running.append(agent)
518 else:
519 print "agent finished"
520 self._agents = still_running
521
522
523class RunMonitor(object):
524 def __init__(self, cmd, nice_level = None, log_file = None):
525 self.nice_level = nice_level
526 self.log_file = log_file
527 self.proc = self.run(cmd)
528
529 def run(self, cmd):
530 if self.nice_level:
531 nice_cmd = ['nice','-n', str(self.nice_level)]
532 nice_cmd.extend(cmd)
533 cmd = nice_cmd
534
535 out_file = None
536 if self.log_file:
537 try:
mblighbb421852008-03-11 22:36:16 +0000538 os.makedirs(os.path.dirname(self.log_file))
mblighcadb3532008-04-15 17:46:26 +0000539 except OSError, exc:
540 if exc.errno != errno.EEXIST:
541 log_stacktrace(
542 'Unexpected error creating logfile '
543 'directory for %s' % self.log_file)
544 try:
mbligh36768f02008-02-22 18:28:33 +0000545 out_file = open(self.log_file, 'a')
546 out_file.write("\n%s\n" % ('*'*80))
mblighcadb3532008-04-15 17:46:26 +0000547 out_file.write("%s> %s\n" %
548 (time.strftime("%X %x"), cmd))
mbligh36768f02008-02-22 18:28:33 +0000549 out_file.write("%s\n" % ('*'*80))
mblighcadb3532008-04-15 17:46:26 +0000550 except (OSError, IOError):
551 log_stacktrace('Error opening log file %s' %
552 self.log_file)
553
mbligh36768f02008-02-22 18:28:33 +0000554 if not out_file:
555 out_file = open('/dev/null', 'w')
mblighcadb3532008-04-15 17:46:26 +0000556
mbligh36768f02008-02-22 18:28:33 +0000557 in_devnull = open('/dev/null', 'r')
558 print "cmd = %s" % cmd
559 print "path = %s" % os.getcwd()
560
561 proc = subprocess.Popen(cmd, stdout=out_file,
562 stderr=subprocess.STDOUT, stdin=in_devnull)
563 out_file.close()
564 in_devnull.close()
565 return proc
566
567
mblighbb421852008-03-11 22:36:16 +0000568 def get_pid(self):
569 return self.proc.pid
570
571
mbligh36768f02008-02-22 18:28:33 +0000572 def kill(self):
mblighbb421852008-03-11 22:36:16 +0000573 kill_autoserv(self.get_pid(), self.exit_code)
574
mbligh36768f02008-02-22 18:28:33 +0000575
576 def exit_code(self):
577 return self.proc.poll()
578
579
mblighbb421852008-03-11 22:36:16 +0000580class PidfileException(Exception):
581 """\
582 Raised when there's some unexpected behavior with the pid file.
583 """
584
585
586class PidfileRunMonitor(RunMonitor):
587 def __init__(self, results_dir, cmd=None, nice_level=None,
588 log_file=None):
589 self.results_dir = os.path.abspath(results_dir)
590 self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
591 self.lost_process = False
mbligh90a549d2008-03-25 23:52:34 +0000592 self.start_time = time.time()
mblighbb421852008-03-11 22:36:16 +0000593 if cmd is None:
594 # we're reattaching to an existing pid, so don't call
595 # the superconstructor (we don't want to kick off a new
596 # process)
597 pass
598 else:
mblighd64e5702008-04-04 21:39:28 +0000599 super(PidfileRunMonitor, self).__init__(cmd,
600 nice_level, log_file)
mblighbb421852008-03-11 22:36:16 +0000601
602
603 def get_pid(self):
604 pid, exit_status = self.get_pidfile_info()
605 assert pid is not None
606 return pid
607
608
mbligh90a549d2008-03-25 23:52:34 +0000609 def _check_command_line(self, command_line, spacer=' ',
610 print_error=False):
611 results_dir_arg = spacer.join(('', '-r', self.results_dir, ''))
612 match = results_dir_arg in command_line
613 if print_error and not match:
614 print '%s not found in %s' % (repr(results_dir_arg),
615 repr(command_line))
616 return match
617
618
619 def _check_proc_fs(self, pid):
mblighbb421852008-03-11 22:36:16 +0000620 cmdline_path = os.path.join('/proc', str(pid), 'cmdline')
621 try:
622 cmdline_file = open(cmdline_path, 'r')
623 cmdline = cmdline_file.read().strip()
624 cmdline_file.close()
625 except IOError:
626 return False
627 # /proc/.../cmdline has \x00 separating args
mbligh90a549d2008-03-25 23:52:34 +0000628 return self._check_command_line(cmdline, spacer='\x00',
629 print_error=True)
mblighbb421852008-03-11 22:36:16 +0000630
631
632 def read_pidfile(self):
633 if not os.path.exists(self.pid_file):
634 return None, None
635 file_obj = open(self.pid_file, 'r')
636 lines = file_obj.readlines()
637 file_obj.close()
638 assert 1 <= len(lines) <= 2
639 try:
640 pid = int(lines[0])
641 exit_status = None
642 if len(lines) == 2:
643 exit_status = int(lines[1])
644 except ValueError, exc:
645 raise Exception('Corrupt pid file: ' + str(exc.args))
646
647 return pid, exit_status
648
649
mbligh90a549d2008-03-25 23:52:34 +0000650 def _find_autoserv_proc(self):
651 autoserv_procs = Dispatcher.find_autoservs()
652 for pid, args in autoserv_procs.iteritems():
653 if self._check_command_line(args):
654 return pid, args
655 return None, None
656
657
mblighbb421852008-03-11 22:36:16 +0000658 def get_pidfile_info(self):
659 """\
660 Returns:
661 None, None if autoserv has not yet run
662 pid, None if autoserv is running
663 pid, exit_status if autoserv has completed
664 """
665 if self.lost_process:
666 return self.pid, self.exit_status
667
668 pid, exit_status = self.read_pidfile()
669
mbligh90a549d2008-03-25 23:52:34 +0000670 if pid is None:
671 return self._handle_no_pid()
672
673 if exit_status is None:
674 # double check whether or not autoserv is running
675 proc_running = self._check_proc_fs(pid)
676 if proc_running:
677 return pid, exit_status
678
679 # pid but no process - maybe process *just* exited
mblighbb421852008-03-11 22:36:16 +0000680 pid, exit_status = self.read_pidfile()
mbligh90a549d2008-03-25 23:52:34 +0000681 if exit_status is None:
mblighbb421852008-03-11 22:36:16 +0000682 # autoserv exited without writing an exit code
683 # to the pidfile
684 error = ('autoserv died without writing exit '
685 'code')
686 message = error + '\nPid: %s\nPidfile: %s' % (
687 pid, self.pid_file)
688 print message
689 send_notify_email(error, message)
mbligh90a549d2008-03-25 23:52:34 +0000690 self.on_lost_process(pid)
mblighbb421852008-03-11 22:36:16 +0000691 return self.pid, self.exit_status
692
693 return pid, exit_status
694
695
mbligh90a549d2008-03-25 23:52:34 +0000696 def _handle_no_pid(self):
697 """\
698 Called when no pidfile is found or no pid is in the pidfile.
699 """
700 # is autoserv running?
701 pid, args = self._find_autoserv_proc()
702 if pid is None:
703 # no autoserv process running
704 message = 'No pid found at ' + self.pid_file
705 else:
706 message = ("Process %d (%s) hasn't written pidfile %s" %
707 (pid, args, self.pid_file))
708
709 print message
710 if time.time() - self.start_time > PIDFILE_TIMEOUT:
711 send_notify_email('Process has failed to write pidfile',
712 message)
713 if pid is not None:
714 kill_autoserv(pid)
715 else:
716 pid = 0
717 self.on_lost_process(pid)
718 return self.pid, self.exit_status
719
720 return None, None
721
722
723 def on_lost_process(self, pid):
724 """\
725 Called when autoserv has exited without writing an exit status,
726 or we've timed out waiting for autoserv to write a pid to the
727 pidfile. In either case, we just return failure and the caller
728 should signal some kind of warning.
729
730 pid is unimportant here, as it shouldn't be used by anyone.
731 """
732 self.lost_process = True
733 self.pid = pid
734 self.exit_status = 1
735
736
mblighbb421852008-03-11 22:36:16 +0000737 def exit_code(self):
738 pid, exit_code = self.get_pidfile_info()
mblighbb421852008-03-11 22:36:16 +0000739 return exit_code
740
741
mbligh36768f02008-02-22 18:28:33 +0000742class Agent(object):
mblighd5c95802008-03-05 00:33:46 +0000743 def __init__(self, tasks, queue_entry_ids=[]):
mbligh36768f02008-02-22 18:28:33 +0000744 self.active_task = None
745 self.queue = Queue.Queue(0)
746 self.dispatcher = None
mblighd5c95802008-03-05 00:33:46 +0000747 self.queue_entry_ids = queue_entry_ids
mbligh36768f02008-02-22 18:28:33 +0000748
749 for task in tasks:
750 self.add_task(task)
751
752
753 def add_task(self, task):
754 self.queue.put_nowait(task)
755 task.agent = self
756
757
758 def tick(self):
759 print "agent tick"
760 if self.active_task and not self.active_task.is_done():
761 self.active_task.poll()
762 else:
763 self._next_task();
764
765
766 def _next_task(self):
767 print "agent picking task"
768 if self.active_task:
769 assert self.active_task.is_done()
770
mblighe2586682008-02-29 22:45:46 +0000771 if not self.active_task.success:
772 self.on_task_failure()
773
mbligh36768f02008-02-22 18:28:33 +0000774 self.active_task = None
775 if not self.is_done():
776 self.active_task = self.queue.get_nowait()
777 if self.active_task:
778 self.active_task.start()
779
780
mblighe2586682008-02-29 22:45:46 +0000781 def on_task_failure(self):
mblighe2586682008-02-29 22:45:46 +0000782 self.queue = Queue.Queue(0)
783 for task in self.active_task.failure_tasks:
784 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +0000785
mblighe2586682008-02-29 22:45:46 +0000786
mbligh36768f02008-02-22 18:28:33 +0000787 def is_done(self):
788 return self.active_task == None and self.queue.empty()
789
790
791 def start(self):
792 assert self.dispatcher
793
794 self._next_task()
795
mblighd5c95802008-03-05 00:33:46 +0000796
mbligh36768f02008-02-22 18:28:33 +0000797class AgentTask(object):
mbligh16c722d2008-03-05 00:58:44 +0000798 def __init__(self, cmd, failure_tasks = []):
mbligh36768f02008-02-22 18:28:33 +0000799 self.done = False
800 self.failure_tasks = failure_tasks
801 self.started = False
802 self.cmd = cmd
mblighd5c95802008-03-05 00:33:46 +0000803 self.task = None
mbligh36768f02008-02-22 18:28:33 +0000804 self.agent = None
mblighd5c95802008-03-05 00:33:46 +0000805 self.monitor = None
mblighd64e5702008-04-04 21:39:28 +0000806 self.success = None
mbligh36768f02008-02-22 18:28:33 +0000807
808
809 def poll(self):
810 print "poll"
mblighd5c95802008-03-05 00:33:46 +0000811 if self.monitor:
mbligh36768f02008-02-22 18:28:33 +0000812 self.tick(self.monitor.exit_code())
813 else:
814 self.finished(False)
815
816
817 def tick(self, exit_code):
818 if exit_code==None:
819 return
820# print "exit_code was %d" % exit_code
821 if exit_code == 0:
822 success = True
823 else:
824 success = False
825
826 self.finished(success)
827
828
829 def is_done(self):
830 return self.done
831
832
833 def finished(self, success):
834 self.done = True
835 self.success = success
836 self.epilog()
837
838
839 def prolog(self):
840 pass
841
mblighd64e5702008-04-04 21:39:28 +0000842
843 def create_temp_resultsdir(self, suffix=''):
844 self.temp_results_dir = tempfile.mkdtemp(suffix=suffix)
845
846
847 def cleanup(self):
848 if (hasattr(self, 'temp_results_dir') and
849 os.path.exists(self.temp_results_dir)):
850 shutil.rmtree(self.temp_results_dir)
851
mbligh36768f02008-02-22 18:28:33 +0000852
853 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +0000854 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +0000855
856
857 def start(self):
858 assert self.agent
859
860 if not self.started:
861 self.prolog()
862 self.run()
863
864 self.started = True
865
mblighd64e5702008-04-04 21:39:28 +0000866
mbligh36768f02008-02-22 18:28:33 +0000867 def abort(self):
mblighd5c95802008-03-05 00:33:46 +0000868 if self.monitor:
869 self.monitor.kill()
870 self.done = True
mblighd64e5702008-04-04 21:39:28 +0000871 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +0000872
873
874 def run(self):
875 if self.cmd:
876 print "agent starting monitor"
mbligh36768f02008-02-22 18:28:33 +0000877 log_file = None
878 if hasattr(self, 'host'):
mblighbb421852008-03-11 22:36:16 +0000879 log_file = os.path.join(RESULTS_DIR, 'hosts',
880 self.host.hostname)
881 self.monitor = RunMonitor(
882 self.cmd, nice_level = AUTOSERV_NICE_LEVEL,
883 log_file = log_file)
mbligh36768f02008-02-22 18:28:33 +0000884
885
886class RepairTask(AgentTask):
mbligh16c722d2008-03-05 00:58:44 +0000887 def __init__(self, host, fail_queue_entry=None):
888 """\
889 fail_queue_entry: queue entry to mark failed if this repair
890 fails.
891 """
mblighd64e5702008-04-04 21:39:28 +0000892 self.create_temp_resultsdir('.repair')
893 cmd = [_autoserv_path , '-R', '-m', host.hostname,
894 '-r', self.temp_results_dir]
mbligh36768f02008-02-22 18:28:33 +0000895 self.host = host
mbligh16c722d2008-03-05 00:58:44 +0000896 self.fail_queue_entry = fail_queue_entry
mblighd64e5702008-04-04 21:39:28 +0000897 super(RepairTask, self).__init__(cmd)
mblighe2586682008-02-29 22:45:46 +0000898
mbligh36768f02008-02-22 18:28:33 +0000899
900 def prolog(self):
901 print "repair_task starting"
902 self.host.set_status('Repairing')
903
904
905 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +0000906 super(RepairTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +0000907 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000908 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +0000909 else:
mbligh16c722d2008-03-05 00:58:44 +0000910 self.host.set_status('Repair Failed')
911 if self.fail_queue_entry:
912 self.fail_queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +0000913
914
915class VerifyTask(AgentTask):
916 def __init__(self, queue_entry=None, host=None):
917 assert bool(queue_entry) != bool(host)
918
919 self.host = host or queue_entry.host
920 self.queue_entry = queue_entry
921
mblighd64e5702008-04-04 21:39:28 +0000922 self.create_temp_resultsdir('.verify')
mbligh48c10a52008-02-29 22:46:38 +0000923 cmd = [_autoserv_path,'-v','-m',self.host.hostname,
mbligh36768f02008-02-22 18:28:33 +0000924 '-r', self.temp_results_dir]
925
mbligh16c722d2008-03-05 00:58:44 +0000926 fail_queue_entry = None
927 if queue_entry and not queue_entry.meta_host:
928 fail_queue_entry = queue_entry
929 failure_tasks = [RepairTask(self.host, fail_queue_entry)]
mblighe2586682008-02-29 22:45:46 +0000930
mblighd64e5702008-04-04 21:39:28 +0000931 super(VerifyTask, self).__init__(cmd,
932 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +0000933
934
mbligh36768f02008-02-22 18:28:33 +0000935 def prolog(self):
936 print "starting verify on %s" % (self.host.hostname)
937 if self.queue_entry:
938 self.queue_entry.set_status('Verifying')
mblighdffd6372008-02-29 22:47:33 +0000939 self.queue_entry.clear_results_dir(
940 self.queue_entry.verify_results_dir())
mbligh36768f02008-02-22 18:28:33 +0000941 self.host.set_status('Verifying')
942
943
mblighd64e5702008-04-04 21:39:28 +0000944 def cleanup(self):
945 if not os.path.exists(self.temp_results_dir):
946 return
mbligh36768f02008-02-22 18:28:33 +0000947 if self.queue_entry and (self.success or
mblighd64e5702008-04-04 21:39:28 +0000948 not self.queue_entry.meta_host):
mbligh36768f02008-02-22 18:28:33 +0000949 self.move_results()
mblighd64e5702008-04-04 21:39:28 +0000950 super(VerifyTask, self).cleanup()
951
952
953 def epilog(self):
954 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +0000955
956 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000957 self.host.set_status('Ready')
958 elif self.queue_entry:
mblighdffd6372008-02-29 22:47:33 +0000959 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +0000960
961
962 def move_results(self):
963 assert self.queue_entry is not None
mblighe2586682008-02-29 22:45:46 +0000964 target_dir = self.queue_entry.verify_results_dir()
mbligh36768f02008-02-22 18:28:33 +0000965 if not os.path.exists(target_dir):
966 os.makedirs(target_dir)
967 files = os.listdir(self.temp_results_dir)
968 for filename in files:
mblighbb421852008-03-11 22:36:16 +0000969 if filename == AUTOSERV_PID_FILE:
970 continue
mblighe2586682008-02-29 22:45:46 +0000971 self.force_move(os.path.join(self.temp_results_dir,
972 filename),
973 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +0000974
975
mblighe2586682008-02-29 22:45:46 +0000976 @staticmethod
977 def force_move(source, dest):
978 """\
979 Replacement for shutil.move() that will delete the destination
980 if it exists, even if it's a directory.
981 """
982 if os.path.exists(dest):
983 print ('Warning: removing existing destination file ' +
984 dest)
985 remove_file_or_dir(dest)
986 shutil.move(source, dest)
987
988
mblighdffd6372008-02-29 22:47:33 +0000989class VerifySynchronousTask(VerifyTask):
990 def __init__(self, queue_entry):
mblighd64e5702008-04-04 21:39:28 +0000991 super(VerifySynchronousTask, self).__init__(
992 queue_entry = queue_entry)
mblighdffd6372008-02-29 22:47:33 +0000993
994
mbligh16c722d2008-03-05 00:58:44 +0000995 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +0000996 super(VerifySynchronousTask, self).epilog()
mbligh16c722d2008-03-05 00:58:44 +0000997 if self.success:
998 if self.queue_entry.job.num_complete() > 0:
999 # some other entry failed verify, and we've
1000 # already been marked as stopped
1001 return
mblighdffd6372008-02-29 22:47:33 +00001002
mbligh16c722d2008-03-05 00:58:44 +00001003 self.queue_entry.set_status('Pending')
1004 job = self.queue_entry.job
1005 if job.is_ready():
1006 agent = job.run(self.queue_entry)
1007 self.agent.dispatcher.add_agent(agent)
mblighe2586682008-02-29 22:45:46 +00001008
mbligh36768f02008-02-22 18:28:33 +00001009class QueueTask(AgentTask):
1010 def __init__(self, job, queue_entries, cmd):
mblighd64e5702008-04-04 21:39:28 +00001011 super(QueueTask, self).__init__(cmd)
mbligh36768f02008-02-22 18:28:33 +00001012 self.job = job
1013 self.queue_entries = queue_entries
1014
1015
mblighbb421852008-03-11 22:36:16 +00001016 @staticmethod
1017 def _write_keyval(results_dir, field, value):
1018 key_path = os.path.join(results_dir, 'keyval')
mbligh36768f02008-02-22 18:28:33 +00001019 keyval_file = open(key_path, 'a')
1020 print >> keyval_file, '%s=%d' % (field, value)
1021 keyval_file.close()
1022
1023
mblighbb421852008-03-11 22:36:16 +00001024 def results_dir(self):
1025 return self.queue_entries[0].results_dir()
1026
1027
1028 def run(self):
1029 """\
1030 Override AgentTask.run() so we can use a PidfileRunMonitor.
1031 """
1032 self.monitor = PidfileRunMonitor(self.results_dir(),
1033 cmd=self.cmd,
1034 nice_level=AUTOSERV_NICE_LEVEL)
1035
1036
mbligh36768f02008-02-22 18:28:33 +00001037 def prolog(self):
mblighe2586682008-02-29 22:45:46 +00001038 # write some job timestamps into the job keyval file
1039 queued = time.mktime(self.job.created_on.timetuple())
1040 started = time.time()
mblighbb421852008-03-11 22:36:16 +00001041 self._write_keyval(self.results_dir(), "job_queued", queued)
1042 self._write_keyval(self.results_dir(), "job_started", started)
mbligh36768f02008-02-22 18:28:33 +00001043 for queue_entry in self.queue_entries:
1044 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
1045 queue_entry.set_status('Running')
1046 queue_entry.host.set_status('Running')
mblighe2586682008-02-29 22:45:46 +00001047 if (not self.job.is_synchronous() and
1048 self.job.num_machines() > 1):
1049 assert len(self.queue_entries) == 1
1050 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001051
1052
1053 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001054 super(QueueTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001055 if self.success:
1056 status = 'Completed'
1057 else:
1058 status = 'Failed'
1059
mblighe2586682008-02-29 22:45:46 +00001060 # write another timestamp into the job keyval file
1061 finished = time.time()
mblighbb421852008-03-11 22:36:16 +00001062 self._write_keyval(self.results_dir(), "job_finished", finished)
mbligh36768f02008-02-22 18:28:33 +00001063 for queue_entry in self.queue_entries:
1064 queue_entry.set_status(status)
1065 queue_entry.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001066
1067 if self.job.is_synchronous() or self.job.num_machines()==1:
1068 if self.job.is_finished():
1069 parse_results(self.job.results_dir())
1070 else:
1071 for queue_entry in self.queue_entries:
mblighbb421852008-03-11 22:36:16 +00001072 parse_results(queue_entry.results_dir(),
1073 flags='-l 2')
1074
mbligh36768f02008-02-22 18:28:33 +00001075 print "queue_task finished with %s/%s" % (status, self.success)
1076
1077
mblighbb421852008-03-11 22:36:16 +00001078class RecoveryQueueTask(QueueTask):
1079 def __init__(self, job, queue_entries, run_monitor):
mblighd64e5702008-04-04 21:39:28 +00001080 super(RecoveryQueueTask, self).__init__(job,
1081 queue_entries, cmd=None)
mblighbb421852008-03-11 22:36:16 +00001082 self.run_monitor = run_monitor
1083
1084
1085 def run(self):
1086 self.monitor = self.run_monitor
1087
1088
1089 def prolog(self):
1090 # recovering an existing process - don't do prolog
1091 pass
1092
1093
mbligh36768f02008-02-22 18:28:33 +00001094class RebootTask(AgentTask):
mblighd5c95802008-03-05 00:33:46 +00001095 def __init__(self, host):
1096 global _autoserv_path
1097
1098 # Current implementation of autoserv requires control file
1099 # to be passed on reboot action request. TODO: remove when no
1100 # longer appropriate.
mblighd64e5702008-04-04 21:39:28 +00001101 self.create_temp_resultsdir('.reboot')
mblighd5c95802008-03-05 00:33:46 +00001102 self.cmd = [_autoserv_path, '-b', '-m', host.hostname,
mblighd64e5702008-04-04 21:39:28 +00001103 '-r', self.temp_results_dir, '/dev/null']
mbligh36768f02008-02-22 18:28:33 +00001104 self.host = host
mblighd64e5702008-04-04 21:39:28 +00001105 super(RebootTask, self).__init__(self.cmd,
mbligh16c722d2008-03-05 00:58:44 +00001106 failure_tasks=[RepairTask(host)])
1107
mblighd5c95802008-03-05 00:33:46 +00001108
1109 def prolog(self):
1110 print "starting reboot task for host: %s" % self.host.hostname
1111 self.host.set_status("Rebooting")
1112
mblighd5c95802008-03-05 00:33:46 +00001113
1114class AbortTask(AgentTask):
1115 def __init__(self, queue_entry, agents_to_abort):
1116 self.queue_entry = queue_entry
1117 self.agents_to_abort = agents_to_abort
1118 for agent in agents_to_abort:
1119 agent.dispatcher.remove_agent(agent)
mblighd64e5702008-04-04 21:39:28 +00001120 super(AbortTask, self).__init__('')
mbligh36768f02008-02-22 18:28:33 +00001121
1122
mblighd5c95802008-03-05 00:33:46 +00001123 def prolog(self):
1124 print "starting abort on host %s, job %s" % (
1125 self.queue_entry.host_id, self.queue_entry.job_id)
1126 self.queue_entry.set_status('Aborting')
1127
mbligh36768f02008-02-22 18:28:33 +00001128
mblighd5c95802008-03-05 00:33:46 +00001129 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001130 super(AbortTask, self).epilog()
mblighd5c95802008-03-05 00:33:46 +00001131 self.queue_entry.set_status('Aborted')
1132 self.success = True
mbligh36768f02008-02-22 18:28:33 +00001133
mblighd64e5702008-04-04 21:39:28 +00001134
mbligh36768f02008-02-22 18:28:33 +00001135 def run(self):
mblighd5c95802008-03-05 00:33:46 +00001136 for agent in self.agents_to_abort:
1137 if (agent.active_task):
1138 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001139
1140
1141class DBObject(object):
mblighe2586682008-02-29 22:45:46 +00001142 def __init__(self, fields, id=None, row=None, new_record=False):
1143 assert (bool(id) != bool(row)) and fields
mbligh36768f02008-02-22 18:28:33 +00001144
mblighe2586682008-02-29 22:45:46 +00001145 self.__table = self._get_table()
mbligh36768f02008-02-22 18:28:33 +00001146 self.__fields = fields
1147
1148 self.__new_record = new_record
1149
1150 if row is None:
1151 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
mbligh6f8bab42008-02-29 22:45:14 +00001152 rows = _db.execute(sql, (id,))
1153 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001154 raise "row not found (table=%s, id=%s)" % \
1155 (self.__table, id)
mbligh6f8bab42008-02-29 22:45:14 +00001156 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001157
mblighe2586682008-02-29 22:45:46 +00001158 assert len(row)==len(fields), (
1159 "table = %s, row = %s/%d, fields = %s/%d" % (
1160 self.__table, row, len(row), fields, len(fields)))
mbligh36768f02008-02-22 18:28:33 +00001161
1162 self.__valid_fields = {}
1163 for i,value in enumerate(row):
1164 self.__dict__[fields[i]] = value
1165 self.__valid_fields[fields[i]] = True
1166
1167 del self.__valid_fields['id']
1168
mblighe2586682008-02-29 22:45:46 +00001169
1170 @classmethod
1171 def _get_table(cls):
1172 raise NotImplementedError('Subclasses must override this')
1173
1174
mbligh36768f02008-02-22 18:28:33 +00001175 def count(self, where, table = None):
1176 if not table:
1177 table = self.__table
mbligh4314a712008-02-29 22:44:30 +00001178
mbligh6f8bab42008-02-29 22:45:14 +00001179 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001180 SELECT count(*) FROM %s
1181 WHERE %s
1182 """ % (table, where))
mbligh36768f02008-02-22 18:28:33 +00001183
mbligh6f8bab42008-02-29 22:45:14 +00001184 assert len(rows) == 1
1185
1186 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001187
1188
1189 def num_cols(self):
1190 return len(self.__fields)
1191
1192
1193 def update_field(self, field, value):
1194 assert self.__valid_fields[field]
1195
1196 if self.__dict__[field] == value:
1197 return
1198
1199 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
1200 (self.__table, field)
mbligh6f8bab42008-02-29 22:45:14 +00001201 _db.execute(query, (value, self.id))
mbligh36768f02008-02-22 18:28:33 +00001202
1203 self.__dict__[field] = value
1204
1205
1206 def save(self):
1207 if self.__new_record:
1208 keys = self.__fields[1:] # avoid id
1209 columns = ','.join([str(key) for key in keys])
1210 values = ['"%s"' % self.__dict__[key] for key in keys]
1211 values = ','.join(values)
1212 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1213 (self.__table, columns, values)
mbligh6f8bab42008-02-29 22:45:14 +00001214 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001215
1216
mblighe2586682008-02-29 22:45:46 +00001217 def delete(self):
1218 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1219 _db.execute(query, (self.id,))
1220
1221
1222 @classmethod
1223 def fetch(cls, where):
1224 rows = _db.execute(
1225 'SELECT * FROM %s WHERE %s' % (cls._get_table(), where))
1226 for row in rows:
1227 yield cls(row=row)
1228
mbligh36768f02008-02-22 18:28:33 +00001229
1230class IneligibleHostQueue(DBObject):
1231 def __init__(self, id=None, row=None, new_record=None):
1232 fields = ['id', 'job_id', 'host_id']
mblighd64e5702008-04-04 21:39:28 +00001233 super(IneligibleHostQueue, self).__init__(fields, id=id,
1234 row=row, new_record=new_record)
mblighe2586682008-02-29 22:45:46 +00001235
1236
1237 @classmethod
1238 def _get_table(cls):
1239 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001240
1241
1242class Host(DBObject):
1243 def __init__(self, id=None, row=None):
1244 fields = ['id', 'hostname', 'locked', 'synch_id','status']
mblighd64e5702008-04-04 21:39:28 +00001245 super(Host, self).__init__(fields, id=id, row=row)
mblighe2586682008-02-29 22:45:46 +00001246
1247
1248 @classmethod
1249 def _get_table(cls):
1250 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001251
1252
1253 def current_task(self):
mbligh6f8bab42008-02-29 22:45:14 +00001254 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001255 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1256 """, (self.id,))
1257
mbligh6f8bab42008-02-29 22:45:14 +00001258 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001259 return None
1260 else:
mbligh6f8bab42008-02-29 22:45:14 +00001261 assert len(rows) == 1
1262 results = rows[0];
mbligh36768f02008-02-22 18:28:33 +00001263# print "current = %s" % results
1264 return HostQueueEntry(row=results)
1265
1266
1267 def next_queue_entries(self):
1268 if self.locked:
1269 print "%s locked, not queuing" % self.hostname
1270 return None
1271# print "%s/%s looking for work" % (self.hostname, self.platform_id)
mbligh6f8bab42008-02-29 22:45:14 +00001272 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001273 SELECT * FROM host_queue_entries
1274 WHERE ((host_id=%s) OR (meta_host IS NOT null AND
1275 (meta_host IN (
1276 SELECT label_id FROM hosts_labels WHERE host_id=%s
1277 )
1278 )
1279 AND job_id NOT IN (
1280 SELECT job_id FROM ineligible_host_queues
1281 WHERE host_id=%s
1282 )))
1283 AND NOT complete AND NOT active
1284 ORDER BY priority DESC, meta_host, id
1285 LIMIT 1
1286 """, (self.id,self.id, self.id))
1287
mbligh6f8bab42008-02-29 22:45:14 +00001288 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001289 return None
1290 else:
mbligh6f8bab42008-02-29 22:45:14 +00001291 return [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001292
1293 def yield_work(self):
1294 print "%s yielding work" % self.hostname
1295 if self.current_task():
1296 self.current_task().requeue()
1297
1298 def set_status(self,status):
mblighbb421852008-03-11 22:36:16 +00001299 print '%s -> %s' % (self.hostname, status)
mbligh36768f02008-02-22 18:28:33 +00001300 self.update_field('status',status)
1301
1302
1303class HostQueueEntry(DBObject):
1304 def __init__(self, id=None, row=None):
1305 assert id or row
1306 fields = ['id', 'job_id', 'host_id', 'priority', 'status',
1307 'meta_host', 'active', 'complete']
mblighd64e5702008-04-04 21:39:28 +00001308 super(HostQueueEntry, self).__init__(fields, id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001309 self.job = Job(self.job_id)
1310
1311 if self.host_id:
1312 self.host = Host(self.host_id)
1313 else:
1314 self.host = None
1315
1316 self.queue_log_path = os.path.join(self.job.results_dir(),
1317 'queue.log.' + str(self.id))
1318
1319
mblighe2586682008-02-29 22:45:46 +00001320 @classmethod
1321 def _get_table(cls):
1322 return 'host_queue_entries'
1323
1324
mbligh36768f02008-02-22 18:28:33 +00001325 def set_host(self, host):
1326 if host:
1327 self.queue_log_record('Assigning host ' + host.hostname)
1328 self.update_field('host_id', host.id)
1329 self.update_field('active', True)
mblighe2586682008-02-29 22:45:46 +00001330 self.block_host(host.id)
mbligh36768f02008-02-22 18:28:33 +00001331 else:
1332 self.queue_log_record('Releasing host')
mblighe2586682008-02-29 22:45:46 +00001333 self.unblock_host(self.host.id)
mbligh36768f02008-02-22 18:28:33 +00001334 self.update_field('host_id', None)
1335
1336 self.host = host
1337
1338
1339 def get_host(self):
mblighe2586682008-02-29 22:45:46 +00001340 return self.host
mbligh36768f02008-02-22 18:28:33 +00001341
1342
1343 def queue_log_record(self, log_line):
mblighe2586682008-02-29 22:45:46 +00001344 now = str(datetime.datetime.now())
mbligh36768f02008-02-22 18:28:33 +00001345 queue_log = open(self.queue_log_path, 'a', 0)
mblighe2586682008-02-29 22:45:46 +00001346 queue_log.write(now + ' ' + log_line + '\n')
mbligh36768f02008-02-22 18:28:33 +00001347 queue_log.close()
1348
1349
mblighe2586682008-02-29 22:45:46 +00001350 def block_host(self, host_id):
1351 print "creating block %s/%s" % (self.job.id, host_id)
1352 row = [0, self.job.id, host_id]
1353 block = IneligibleHostQueue(row=row, new_record=True)
1354 block.save()
1355
1356
1357 def unblock_host(self, host_id):
1358 print "removing block %s/%s" % (self.job.id, host_id)
1359 blocks = list(IneligibleHostQueue.fetch(
1360 'job_id=%d and host_id=%d' % (self.job.id, host_id)))
1361 assert len(blocks) == 1
1362 blocks[0].delete()
1363
1364
mbligh36768f02008-02-22 18:28:33 +00001365 def results_dir(self):
mblighe2586682008-02-29 22:45:46 +00001366 if self.job.is_synchronous() or self.job.num_machines() == 1:
1367 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001368 else:
1369 assert self.host
mblighe2586682008-02-29 22:45:46 +00001370 return os.path.join(self.job.job_dir,
1371 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +00001372
mblighe2586682008-02-29 22:45:46 +00001373
1374 def verify_results_dir(self):
1375 if self.job.is_synchronous() or self.job.num_machines() > 1:
1376 assert self.host
1377 return os.path.join(self.job.job_dir,
1378 self.host.hostname)
1379 else:
1380 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001381
1382
1383 def set_status(self, status):
1384 self.update_field('status', status)
1385 if self.host:
1386 hostname = self.host.hostname
1387 else:
1388 hostname = 'no host'
1389 print "%s/%d status -> %s" % (hostname, self.id, self.status)
1390 if status in ['Queued']:
1391 self.update_field('complete', False)
1392 self.update_field('active', False)
1393
mblighd5c95802008-03-05 00:33:46 +00001394 if status in ['Pending', 'Running', 'Verifying', 'Starting',
1395 'Abort', 'Aborting']:
mbligh36768f02008-02-22 18:28:33 +00001396 self.update_field('complete', False)
1397 self.update_field('active', True)
1398
mblighd5c95802008-03-05 00:33:46 +00001399 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
mbligh36768f02008-02-22 18:28:33 +00001400 self.update_field('complete', True)
1401 self.update_field('active', False)
1402
1403
1404 def run(self,assigned_host=None):
1405 if self.meta_host:
1406 assert assigned_host
mblighe2586682008-02-29 22:45:46 +00001407 # ensure results dir exists for the queue log
mbligh36768f02008-02-22 18:28:33 +00001408 self.job.create_results_dir()
1409 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001410
mbligh36768f02008-02-22 18:28:33 +00001411 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1412 self.meta_host, self.host.hostname, self.status)
1413
1414 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001415
mbligh36768f02008-02-22 18:28:33 +00001416 def requeue(self):
1417 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001418
mbligh36768f02008-02-22 18:28:33 +00001419 if self.meta_host:
1420 self.set_host(None)
1421
1422
mblighe2586682008-02-29 22:45:46 +00001423 def handle_host_failure(self):
1424 """\
1425 Called when this queue entry's host has failed verification and
1426 repair.
1427 """
mblighdffd6372008-02-29 22:47:33 +00001428 assert not self.meta_host
1429 self.set_status('Failed')
1430 if self.job.is_synchronous():
1431 self.job.stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00001432
1433
mbligh90a549d2008-03-25 23:52:34 +00001434 def clear_results_dir(self, results_dir=None, dont_delete_files=False):
mblighe2586682008-02-29 22:45:46 +00001435 results_dir = results_dir or self.results_dir()
1436 if not os.path.exists(results_dir):
1437 return
mbligh90a549d2008-03-25 23:52:34 +00001438 if dont_delete_files:
1439 temp_dir = tempfile.mkdtemp(suffix='.clear_results')
1440 print 'Moving results from %s to %s' % (results_dir,
1441 temp_dir)
mblighe2586682008-02-29 22:45:46 +00001442 for filename in os.listdir(results_dir):
mblighe2586682008-02-29 22:45:46 +00001443 path = os.path.join(results_dir, filename)
mbligh90a549d2008-03-25 23:52:34 +00001444 if dont_delete_files:
1445 shutil.move(path,
1446 os.path.join(temp_dir, filename))
1447 else:
1448 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00001449
1450
1451class Job(DBObject):
1452 def __init__(self, id=None, row=None):
1453 assert id or row
mblighd64e5702008-04-04 21:39:28 +00001454 super(Job, self).__init__(
mblighe2586682008-02-29 22:45:46 +00001455 ['id','owner','name','priority',
1456 'control_file','control_type','created_on',
1457 'synch_type', 'synch_count','synchronizing'],
1458 id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001459
mblighe2586682008-02-29 22:45:46 +00001460 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
1461 self.owner))
1462
1463
1464 @classmethod
1465 def _get_table(cls):
1466 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001467
1468
1469 def is_server_job(self):
1470 return self.control_type != 2
1471
1472
1473 def get_host_queue_entries(self):
mbligh6f8bab42008-02-29 22:45:14 +00001474 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001475 SELECT * FROM host_queue_entries
1476 WHERE job_id= %s
1477 """, (self.id,))
mbligh6f8bab42008-02-29 22:45:14 +00001478 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001479
1480 assert len(entries)>0
1481
1482 return entries
1483
1484
1485 def set_status(self, status, update_queues=False):
1486 self.update_field('status',status)
1487
1488 if update_queues:
1489 for queue_entry in self.get_host_queue_entries():
1490 queue_entry.set_status(status)
1491
1492
1493 def is_synchronous(self):
1494 return self.synch_type == 2
1495
1496
1497 def is_ready(self):
1498 if not self.is_synchronous():
1499 return True
1500 sql = "job_id=%s AND status='Pending'" % self.id
1501 count = self.count(sql, table='host_queue_entries')
1502 return (count == self.synch_count)
1503
1504
1505 def ready_to_synchronize(self):
1506 # heuristic
1507 queue_entries = self.get_host_queue_entries()
1508 count = 0
1509 for queue_entry in queue_entries:
1510 if queue_entry.status == 'Pending':
1511 count += 1
1512
1513 return (count/self.synch_count >= 0.5)
1514
1515
1516 def start_synchronizing(self):
1517 self.update_field('synchronizing', True)
1518
1519
1520 def results_dir(self):
1521 return self.job_dir
1522
1523 def num_machines(self, clause = None):
1524 sql = "job_id=%s" % self.id
1525 if clause:
1526 sql += " AND (%s)" % clause
1527 return self.count(sql, table='host_queue_entries')
1528
1529
1530 def num_queued(self):
1531 return self.num_machines('not complete')
1532
1533
1534 def num_active(self):
1535 return self.num_machines('active')
1536
1537
1538 def num_complete(self):
1539 return self.num_machines('complete')
1540
1541
1542 def is_finished(self):
1543 left = self.num_queued()
1544 print "%s: %s machines left" % (self.name, left)
1545 return left==0
1546
1547 def stop_synchronizing(self):
1548 self.update_field('synchronizing', False)
1549 self.set_status('Queued', update_queues = False)
1550
1551
mblighe2586682008-02-29 22:45:46 +00001552 def stop_all_entries(self):
1553 for child_entry in self.get_host_queue_entries():
1554 if not child_entry.complete:
1555 child_entry.set_status('Stopped')
1556
1557
1558 def write_to_machines_file(self, queue_entry):
1559 hostname = queue_entry.get_host().hostname
1560 print "writing %s to job %s machines file" % (hostname, self.id)
1561 file_path = os.path.join(self.job_dir, '.machines')
1562 mf = open(file_path, 'a')
1563 mf.write("%s\n" % queue_entry.get_host().hostname)
1564 mf.close()
mbligh36768f02008-02-22 18:28:33 +00001565
1566
1567 def create_results_dir(self, queue_entry=None):
1568 print "create: active: %s complete %s" % (self.num_active(),
1569 self.num_complete())
1570
1571 if not os.path.exists(self.job_dir):
1572 os.makedirs(self.job_dir)
1573
1574 if queue_entry:
1575 return queue_entry.results_dir()
1576 return self.job_dir
1577
1578
1579 def run(self, queue_entry):
1580 results_dir = self.create_results_dir(queue_entry)
1581
1582 if self.is_synchronous():
1583 if not self.is_ready():
mblighd5c95802008-03-05 00:33:46 +00001584 return Agent([VerifySynchronousTask(
1585 queue_entry = queue_entry)],
1586 [queue_entry.id])
mbligh36768f02008-02-22 18:28:33 +00001587
1588 queue_entry.set_status('Starting')
1589
1590 ctrl = open(os.tmpnam(), 'w')
1591 if self.control_file:
1592 ctrl.write(self.control_file)
1593 else:
1594 ctrl.write("")
1595 ctrl.flush()
1596
1597 if self.is_synchronous():
mbligh36768f02008-02-22 18:28:33 +00001598 queue_entries = self.get_host_queue_entries()
1599 else:
1600 assert queue_entry
mbligh36768f02008-02-22 18:28:33 +00001601 queue_entries = [queue_entry]
mblighe2586682008-02-29 22:45:46 +00001602 hostnames = ','.join([entry.get_host().hostname
1603 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00001604
mbligh6437ff52008-04-17 15:24:38 +00001605 # determine the job tag
1606 if self.is_synchronous() or self.num_machines() == 1:
1607 job_name = "%s-%s" % (self.id, self.owner)
1608 else:
1609 job_name = "%s-%s/%s" % (self.id, self.owner,
1610 hostnames)
1611
1612 params = [_autoserv_path, '-P', job_name, '-p', '-n',
mblighbb421852008-03-11 22:36:16 +00001613 '-r', os.path.abspath(results_dir),
1614 '-b', '-u', self.owner, '-l', self.name,
1615 '-m', hostnames, ctrl.name]
mbligh36768f02008-02-22 18:28:33 +00001616
1617 if not self.is_server_job():
1618 params.append('-c')
1619
1620 tasks = []
1621 if not self.is_synchronous():
1622 tasks.append(VerifyTask(queue_entry))
mblighe2586682008-02-29 22:45:46 +00001623
1624 tasks.append(QueueTask(job = self,
1625 queue_entries = queue_entries,
1626 cmd = params))
mbligh36768f02008-02-22 18:28:33 +00001627
mblighd5c95802008-03-05 00:33:46 +00001628 ids = []
1629 for entry in queue_entries:
1630 ids.append(entry.id)
1631
1632 agent = Agent(tasks, ids)
mbligh36768f02008-02-22 18:28:33 +00001633
1634 return agent
1635
1636
1637if __name__ == '__main__':
1638 main()