blob: a2670f327f163de63eb13fb7f911708be146dad9 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
mblighcadb3532008-04-15 17:46:26 +00009import optparse, signal, smtplib, socket, datetime, stat, pwd, errno
mblighb090f142008-02-27 21:33:46 +000010from common import global_config
11
mbligh36768f02008-02-22 18:28:33 +000012RESULTS_DIR = '.'
13AUTOSERV_NICE_LEVEL = 10
14
15AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
16
17if os.environ.has_key('AUTOTEST_DIR'):
18 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
19AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
20AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
21
22if AUTOTEST_SERVER_DIR not in sys.path:
23 sys.path.insert(0, AUTOTEST_SERVER_DIR)
24
mblighbb421852008-03-11 22:36:16 +000025AUTOSERV_PID_FILE = '.autoserv_execute'
mbligh90a549d2008-03-25 23:52:34 +000026# how long to wait for autoserv to write a pidfile
27PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000028
mbligh6f8bab42008-02-29 22:45:14 +000029_db = None
mbligh36768f02008-02-22 18:28:33 +000030_shutdown = False
31_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000032_autoserv_path = 'autoserv'
33_testing_mode = False
mbligh36768f02008-02-22 18:28:33 +000034
35
36def main():
37 usage = 'usage: %prog [options] results_dir'
38
39 parser = optparse.OptionParser(usage)
mblighbb421852008-03-11 22:36:16 +000040 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
mbligh36768f02008-02-22 18:28:33 +000041 action='store_true')
42 parser.add_option('--logfile', help='Set a log file that all stdout ' +
43 'should be redirected to. Stderr will go to this ' +
44 'file + ".err"')
45 parser.add_option('--notify', help='Set an email address to be ' +
46 'notified of exceptions')
mbligh4314a712008-02-29 22:44:30 +000047 parser.add_option('--test', help='Indicate that scheduler is under ' +
48 'test and should use dummy autoserv and no parsing',
49 action='store_true')
mbligh36768f02008-02-22 18:28:33 +000050 (options, args) = parser.parse_args()
51 if len(args) != 1:
52 parser.print_usage()
53 return
54
55 global RESULTS_DIR
56 RESULTS_DIR = args[0]
57
58 global _notify_email
59 _notify_email = options.notify
mbligh4314a712008-02-29 22:44:30 +000060
61 if options.test:
62 global _autoserv_path
63 _autoserv_path = 'autoserv_dummy'
64 global _testing_mode
65 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000066
67 init(options.logfile)
mblighbb421852008-03-11 22:36:16 +000068 dispatcher = Dispatcher()
69 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
mbligh36768f02008-02-22 18:28:33 +000070
71 try:
72 while not _shutdown:
73 dispatcher.tick()
74 time.sleep(20)
mbligh36768f02008-02-22 18:28:33 +000075 except:
76 log_stacktrace("Uncaught exception; terminating monitor_db")
77
mbligh6f8bab42008-02-29 22:45:14 +000078 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +000079
80
81def handle_sigint(signum, frame):
82 global _shutdown
83 _shutdown = True
84 print "Shutdown request received."
85
86
87def init(logfile):
88 if logfile:
89 enable_logging(logfile)
90 print "%s> dispatcher starting" % time.strftime("%X %x")
91 print "My PID is %d" % os.getpid()
92
93 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
mbligh6f8bab42008-02-29 22:45:14 +000094 global _db
95 _db = DatabaseConn()
mbligh36768f02008-02-22 18:28:33 +000096
97 print "Setting signal handler"
98 signal.signal(signal.SIGINT, handle_sigint)
99
100 print "Connected! Running..."
101
102
103def enable_logging(logfile):
104 out_file = logfile
105 err_file = "%s.err" % logfile
106 print "Enabling logging to %s (%s)" % (out_file, err_file)
107 out_fd = open(out_file, "a", buffering=0)
108 err_fd = open(err_file, "a", buffering=0)
109
110 os.dup2(out_fd.fileno(), sys.stdout.fileno())
111 os.dup2(err_fd.fileno(), sys.stderr.fileno())
112
113 sys.stdout = out_fd
114 sys.stderr = err_fd
115
116
117def idle_hosts():
mbligh6f8bab42008-02-29 22:45:14 +0000118 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000119 SELECT * FROM hosts h WHERE
120 id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
121 (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
122 OR
123 (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
124 INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
125 )
mbligh5244cbb2008-04-24 20:39:52 +0000126 AND locked=false AND invalid=false
127 AND (h.status IS null OR h.status='Ready') """)
mbligh6f8bab42008-02-29 22:45:14 +0000128 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000129 return hosts
130
mblighd5c95802008-03-05 00:33:46 +0000131def queue_entries_to_abort():
132 rows = _db.execute("""
133 SELECT * FROM host_queue_entries WHERE status='Abort';
134 """)
135 qe = [HostQueueEntry(row=i) for i in rows]
136 return qe
mbligh36768f02008-02-22 18:28:33 +0000137
mblighe2586682008-02-29 22:45:46 +0000138def remove_file_or_dir(path):
139 if stat.S_ISDIR(os.stat(path).st_mode):
140 # directory
141 shutil.rmtree(path)
142 else:
143 # file
144 os.remove(path)
145
146
mbligh6f8bab42008-02-29 22:45:14 +0000147class DatabaseConn:
148 def __init__(self):
149 self.reconnect_wait = 20
150 self.conn = None
151 self.cur = None
mbligh36768f02008-02-22 18:28:33 +0000152
mbligh4eb2df22008-03-13 15:39:29 +0000153 import MySQLdb.converters
154 self.convert_dict = MySQLdb.converters.conversions
155 self.convert_dict.setdefault(bool, self.convert_boolean)
156
mbligh6f8bab42008-02-29 22:45:14 +0000157 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000158
159
mbligh4eb2df22008-03-13 15:39:29 +0000160 @staticmethod
161 def convert_boolean(boolean, conversion_dict):
162 'Convert booleans to integer strings'
163 return str(int(boolean))
164
165
mbligh6f8bab42008-02-29 22:45:14 +0000166 def connect(self):
167 self.disconnect()
168
169 # get global config and parse for info
170 c = global_config.global_config
171 dbase = "AUTOTEST_WEB"
mbligh104e9ce2008-03-11 22:01:44 +0000172 DB_HOST = c.get_config_value(dbase, "host")
173 DB_SCHEMA = c.get_config_value(dbase, "database")
mbligh6f8bab42008-02-29 22:45:14 +0000174
175 global _testing_mode
176 if _testing_mode:
177 DB_SCHEMA = 'stresstest_autotest_web'
178
mbligh104e9ce2008-03-11 22:01:44 +0000179 DB_USER = c.get_config_value(dbase, "user")
180 DB_PASS = c.get_config_value(dbase, "password")
mbligh6f8bab42008-02-29 22:45:14 +0000181
182 while not self.conn:
183 try:
mbligh4eb2df22008-03-13 15:39:29 +0000184 self.conn = MySQLdb.connect(
185 host=DB_HOST, user=DB_USER, passwd=DB_PASS,
186 db=DB_SCHEMA, conv=self.convert_dict)
mbligh6f8bab42008-02-29 22:45:14 +0000187
188 self.conn.autocommit(True)
189 self.cur = self.conn.cursor()
190 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000191 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000192 print "Can't connect to MYSQL; reconnecting"
193 time.sleep(self.reconnect_wait)
194 self.disconnect()
195
196
197 def disconnect(self):
198 if self.conn:
199 self.conn.close()
200 self.conn = None
201 self.cur = None
202
203
204 def execute(self, *args, **dargs):
205 while (True):
206 try:
207 self.cur.execute(*args, **dargs)
208 return self.cur.fetchall()
209 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000210 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000211 print "MYSQL connection died; reconnecting"
212 time.sleep(self.reconnect_wait)
213 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000214
215
mblighdbdac6c2008-03-05 15:49:58 +0000216def generate_parse_command(results_dir, flags=""):
217 parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
218 output = os.path.abspath(os.path.join(results_dir, '.parse.log'))
219 cmd = "%s %s -r -o %s > %s 2>&1 &"
220 return cmd % (parse, flags, results_dir, output)
221
222
mbligh36768f02008-02-22 18:28:33 +0000223def parse_results(results_dir, flags=""):
mbligh4314a712008-02-29 22:44:30 +0000224 if _testing_mode:
225 return
mblighdbdac6c2008-03-05 15:49:58 +0000226 os.system(generate_parse_command(results_dir, flags))
mbligh36768f02008-02-22 18:28:33 +0000227
228
mblighbb421852008-03-11 22:36:16 +0000229def send_notify_email(subject, message):
230 if not _notify_email:
231 return
232
233 message = "%s / %s / %s\n%s" % (socket.gethostname(), os.getpid(),
234 time.strftime("%X %x"), message)
235 sender = pwd.getpwuid(os.getuid())[0] # see os.getlogin() online docs
236 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
237 sender, _notify_email, subject, message)
238 mailer = smtplib.SMTP('localhost')
239 mailer.sendmail(sender, _notify_email, msg)
240 mailer.quit()
241
242
mbligh36768f02008-02-22 18:28:33 +0000243def log_stacktrace(reason):
244 (type, value, tb) = sys.exc_info()
245 str = "EXCEPTION: %s\n" % reason
mbligh36768f02008-02-22 18:28:33 +0000246 str += ''.join(traceback.format_exception(type, value, tb))
247
248 sys.stderr.write("\n%s\n" % str)
mblighbb421852008-03-11 22:36:16 +0000249 send_notify_email("monitor_db exception", str)
mbligh36768f02008-02-22 18:28:33 +0000250
mblighbb421852008-03-11 22:36:16 +0000251
252def get_proc_poll_fn(pid):
253 proc_path = os.path.join('/proc', str(pid))
254 def poll_fn():
255 if os.path.exists(proc_path):
256 return None
257 return 0 # we can't get a real exit code
258 return poll_fn
259
260
261def kill_autoserv(pid, poll_fn=None):
262 print 'killing', pid
263 if poll_fn is None:
264 poll_fn = get_proc_poll_fn(pid)
265 if poll_fn() == None:
266 os.kill(pid, signal.SIGCONT)
267 os.kill(pid, signal.SIGTERM)
mbligh36768f02008-02-22 18:28:33 +0000268
269
270class Dispatcher:
mbligh90a549d2008-03-25 23:52:34 +0000271 autoserv_procs_cache = None
272
mblighbb421852008-03-11 22:36:16 +0000273 def __init__(self):
mbligh36768f02008-02-22 18:28:33 +0000274 self._agents = []
mbligh36768f02008-02-22 18:28:33 +0000275
mbligh36768f02008-02-22 18:28:33 +0000276
mblighbb421852008-03-11 22:36:16 +0000277 def do_initial_recovery(self, recover_hosts=True):
278 # always recover processes
279 self._recover_processes()
280
281 if recover_hosts:
282 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000283
284
285 def tick(self):
mbligh90a549d2008-03-25 23:52:34 +0000286 Dispatcher.autoserv_procs_cache = None
mblighbb421852008-03-11 22:36:16 +0000287 self._find_aborting()
288 self._find_more_work()
mbligh36768f02008-02-22 18:28:33 +0000289 self._handle_agents()
mbligh62ba2ed2008-04-30 17:09:25 +0000290 self._clear_inactive_blocks()
mbligh36768f02008-02-22 18:28:33 +0000291
292
293 def add_agent(self, agent):
294 self._agents.append(agent)
295 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000296
297 # Find agent corresponding to the specified queue_entry
298 def get_agents(self, queue_entry):
299 res_agents = []
300 for agent in self._agents:
301 if queue_entry.id in agent.queue_entry_ids:
302 res_agents.append(agent)
303 return res_agents
304
305
306 def remove_agent(self, agent):
307 self._agents.remove(agent)
mbligh36768f02008-02-22 18:28:33 +0000308
309
mbligh90a549d2008-03-25 23:52:34 +0000310 @classmethod
311 def find_autoservs(cls, orphans_only=False):
mblighbb421852008-03-11 22:36:16 +0000312 """\
313 Returns a dict mapping pids to command lines for root autoserv
mbligh90a549d2008-03-25 23:52:34 +0000314 processes. If orphans_only=True, return only processes that
315 have been orphaned (i.e. parent pid = 1).
mblighbb421852008-03-11 22:36:16 +0000316 """
mbligh90a549d2008-03-25 23:52:34 +0000317 if cls.autoserv_procs_cache is not None:
318 return cls.autoserv_procs_cache
319
mblighbb421852008-03-11 22:36:16 +0000320 proc = subprocess.Popen(
mbligh90a549d2008-03-25 23:52:34 +0000321 ['/bin/ps', 'x', '-o', 'pid,pgid,ppid,comm,args'],
mblighbb421852008-03-11 22:36:16 +0000322 stdout=subprocess.PIPE)
323 # split each line into the four columns output by ps
mbligh90a549d2008-03-25 23:52:34 +0000324 procs = [line.split(None, 4) for line in
mblighbb421852008-03-11 22:36:16 +0000325 proc.communicate()[0].splitlines()]
mbligh90a549d2008-03-25 23:52:34 +0000326 autoserv_procs = {}
327 for proc in procs:
328 # check ppid == 1 for orphans
329 if orphans_only and proc[2] != 1:
330 continue
331 # only root autoserv processes have pgid == pid
332 if (proc[3] == 'autoserv' and # comm
333 proc[1] == proc[0]): # pgid == pid
334 # map pid to args
335 autoserv_procs[int(proc[0])] = proc[4]
336 cls.autoserv_procs_cache = autoserv_procs
337 return autoserv_procs
mblighbb421852008-03-11 22:36:16 +0000338
339
340 def recover_queue_entry(self, queue_entry, run_monitor):
341 job = queue_entry.job
342 if job.is_synchronous():
343 all_queue_entries = job.get_host_queue_entries()
344 else:
345 all_queue_entries = [queue_entry]
346 all_queue_entry_ids = [queue_entry.id for queue_entry
347 in all_queue_entries]
348 queue_task = RecoveryQueueTask(
349 job=queue_entry.job,
350 queue_entries=all_queue_entries,
351 run_monitor=run_monitor)
352 self.add_agent(Agent(tasks=[queue_task],
353 queue_entry_ids=all_queue_entry_ids))
354
355
356 def _recover_processes(self):
mbligh90a549d2008-03-25 23:52:34 +0000357 orphans = self.find_autoservs(orphans_only=True)
mblighbb421852008-03-11 22:36:16 +0000358
359 # first, recover running queue entries
360 rows = _db.execute("""SELECT * FROM host_queue_entries
361 WHERE status = 'Running'""")
362 queue_entries = [HostQueueEntry(row=i) for i in rows]
363 requeue_entries = []
364 recovered_entry_ids = set()
365 for queue_entry in queue_entries:
366 run_monitor = PidfileRunMonitor(
367 queue_entry.results_dir())
368 pid, exit_code = run_monitor.get_pidfile_info()
369 if pid is None:
370 # autoserv apparently never got run, so requeue
371 requeue_entries.append(queue_entry)
372 continue
373 if queue_entry.id in recovered_entry_ids:
374 # synchronous job we've already recovered
375 continue
376 print 'Recovering queue entry %d (pid %d)' % (
377 queue_entry.id, pid)
378 job = queue_entry.job
379 if job.is_synchronous():
380 for entry in job.get_host_queue_entries():
381 assert entry.active
382 recovered_entry_ids.add(entry.id)
383 self.recover_queue_entry(queue_entry,
384 run_monitor)
385 orphans.pop(pid, None)
386
387 # and requeue other active queue entries
388 rows = _db.execute("""SELECT * FROM host_queue_entries
389 WHERE active AND NOT complete
390 AND status != 'Running'
391 AND status != 'Pending'
392 AND status != 'Abort'
393 AND status != 'Aborting'""")
394 queue_entries = [HostQueueEntry(row=i) for i in rows]
395 for queue_entry in queue_entries + requeue_entries:
396 print 'Requeuing running QE %d' % queue_entry.id
mbligh90a549d2008-03-25 23:52:34 +0000397 queue_entry.clear_results_dir(dont_delete_files=True)
mblighbb421852008-03-11 22:36:16 +0000398 queue_entry.requeue()
399
400
401 # now kill any remaining autoserv processes
402 for pid in orphans.keys():
403 print 'Killing orphan %d (%s)' % (pid, orphans[pid])
404 kill_autoserv(pid)
405
406 # recover aborting tasks
mbligh90a549d2008-03-25 23:52:34 +0000407 rebooting_host_ids = set()
mblighd5c95802008-03-05 00:33:46 +0000408 rows = _db.execute("""SELECT * FROM host_queue_entries
409 WHERE status='Abort' or status='Aborting'""")
mblighbb421852008-03-11 22:36:16 +0000410 queue_entries = [HostQueueEntry(row=i) for i in rows]
411 for queue_entry in queue_entries:
mbligh90a549d2008-03-25 23:52:34 +0000412 print 'Recovering aborting QE %d' % queue_entry.id
mblighbb421852008-03-11 22:36:16 +0000413 queue_host = queue_entry.get_host()
414 reboot_task = RebootTask(queue_host)
415 verify_task = VerifyTask(host = queue_host)
416 self.add_agent(Agent(tasks=[reboot_task,
417 verify_task],
418 queue_entry_ids=[queue_entry.id]))
419 queue_entry.set_status('Aborted')
420 # Secure the host from being picked up
421 queue_host.set_status('Rebooting')
mbligh90a549d2008-03-25 23:52:34 +0000422 rebooting_host_ids.add(queue_host.id)
mblighd5c95802008-03-05 00:33:46 +0000423
mblighbb421852008-03-11 22:36:16 +0000424 # reverify hosts that were in the middle of verify, repair or
425 # reboot
mbligh90a549d2008-03-25 23:52:34 +0000426 self._reverify_hosts_where("""(status = 'Repairing' OR
427 status = 'Verifying' OR
428 status = 'Rebooting')""",
429 exclude_ids=rebooting_host_ids)
430
431 # finally, recover "Running" hosts with no active queue entries,
432 # although this should never happen
433 message = ('Recovering running host %s - this probably '
434 'indicates a scheduler bug')
435 self._reverify_hosts_where("""status = 'Running' AND
436 id NOT IN (SELECT host_id
437 FROM host_queue_entries
438 WHERE active)""",
439 print_message=message)
440
441
442 def _reverify_hosts_where(self, where,
443 print_message='Reverifying host %s',
444 exclude_ids=set()):
mbligh5244cbb2008-04-24 20:39:52 +0000445 rows = _db.execute('SELECT * FROM hosts WHERE locked = 0 AND '
446 'invalid = 0 AND ' + where)
mblighbb421852008-03-11 22:36:16 +0000447 hosts = [Host(row=i) for i in rows]
448 for host in hosts:
mbligh90a549d2008-03-25 23:52:34 +0000449 if host.id in exclude_ids:
450 continue
451 if print_message is not None:
452 print print_message % host.hostname
453 verify_task = VerifyTask(host = host)
454 self.add_agent(Agent(tasks = [verify_task]))
mblighbb421852008-03-11 22:36:16 +0000455
456
457 def _recover_hosts(self):
mbligh90a549d2008-03-25 23:52:34 +0000458 # recover "Repair Failed" hosts
459 message = 'Reverifying dead host %s'
460 self._reverify_hosts_where("status = 'Repair Failed'",
461 print_message=message)
mbligh36768f02008-02-22 18:28:33 +0000462
463
mbligh62ba2ed2008-04-30 17:09:25 +0000464 def _clear_inactive_blocks(self):
465 """
466 Clear out blocks for all completed jobs.
467 """
468 # this would be simpler using NOT IN (subquery), but MySQL
469 # treats all IN subqueries as dependent, so this optimizes much
470 # better
471 _db.execute("""
472 DELETE ihq FROM ineligible_host_queues ihq
473 LEFT JOIN (SELECT job_id FROM host_queue_entries
474 WHERE NOT complete) hqe
475 USING (job_id) WHERE hqe.job_id IS NULL""")
476
477
mbligh36768f02008-02-22 18:28:33 +0000478 def _find_more_work(self):
479 print "finding work"
480
481 num_started = 0
482 for host in idle_hosts():
483 tasks = host.next_queue_entries()
484 if tasks:
485 for next in tasks:
486 try:
487 agent = next.run(assigned_host=host)
488 if agent:
489 self.add_agent(agent)
490
491 num_started += 1
492 if num_started>=100:
493 return
494 break
495 except:
496 next.set_status('Failed')
497
498# if next.host:
499# next.host.set_status('Ready')
500
501 log_stacktrace("task_id = %d" % next.id)
502
503
mblighd5c95802008-03-05 00:33:46 +0000504 def _find_aborting(self):
505 num_aborted = 0
506 # Find jobs that are aborting
507 for entry in queue_entries_to_abort():
508 agents_to_abort = self.get_agents(entry)
509 entry_host = entry.get_host()
510 reboot_task = RebootTask(entry_host)
511 verify_task = VerifyTask(host = entry_host)
512 tasks = [reboot_task, verify_task]
513 if agents_to_abort:
514 abort_task = AbortTask(entry, agents_to_abort)
515 tasks.insert(0, abort_task)
516 else:
517 entry.set_status('Aborted')
518 # just to make sure this host does not get
519 # taken away
520 entry_host.set_status('Rebooting')
521 self.add_agent(Agent(tasks=tasks,
522 queue_entry_ids = [entry.id]))
523 num_aborted += 1
524 if num_aborted >= 50:
525 break
526
527
mbligh36768f02008-02-22 18:28:33 +0000528 def _handle_agents(self):
529 still_running = []
530 for agent in self._agents:
531 agent.tick()
532 if not agent.is_done():
533 still_running.append(agent)
534 else:
535 print "agent finished"
536 self._agents = still_running
537
538
539class RunMonitor(object):
540 def __init__(self, cmd, nice_level = None, log_file = None):
541 self.nice_level = nice_level
542 self.log_file = log_file
543 self.proc = self.run(cmd)
544
545 def run(self, cmd):
546 if self.nice_level:
547 nice_cmd = ['nice','-n', str(self.nice_level)]
548 nice_cmd.extend(cmd)
549 cmd = nice_cmd
550
551 out_file = None
552 if self.log_file:
553 try:
mblighbb421852008-03-11 22:36:16 +0000554 os.makedirs(os.path.dirname(self.log_file))
mblighcadb3532008-04-15 17:46:26 +0000555 except OSError, exc:
556 if exc.errno != errno.EEXIST:
557 log_stacktrace(
558 'Unexpected error creating logfile '
559 'directory for %s' % self.log_file)
560 try:
mbligh36768f02008-02-22 18:28:33 +0000561 out_file = open(self.log_file, 'a')
562 out_file.write("\n%s\n" % ('*'*80))
mblighcadb3532008-04-15 17:46:26 +0000563 out_file.write("%s> %s\n" %
564 (time.strftime("%X %x"), cmd))
mbligh36768f02008-02-22 18:28:33 +0000565 out_file.write("%s\n" % ('*'*80))
mblighcadb3532008-04-15 17:46:26 +0000566 except (OSError, IOError):
567 log_stacktrace('Error opening log file %s' %
568 self.log_file)
569
mbligh36768f02008-02-22 18:28:33 +0000570 if not out_file:
571 out_file = open('/dev/null', 'w')
mblighcadb3532008-04-15 17:46:26 +0000572
mbligh36768f02008-02-22 18:28:33 +0000573 in_devnull = open('/dev/null', 'r')
574 print "cmd = %s" % cmd
575 print "path = %s" % os.getcwd()
576
577 proc = subprocess.Popen(cmd, stdout=out_file,
578 stderr=subprocess.STDOUT, stdin=in_devnull)
579 out_file.close()
580 in_devnull.close()
581 return proc
582
583
mblighbb421852008-03-11 22:36:16 +0000584 def get_pid(self):
585 return self.proc.pid
586
587
mbligh36768f02008-02-22 18:28:33 +0000588 def kill(self):
mblighbb421852008-03-11 22:36:16 +0000589 kill_autoserv(self.get_pid(), self.exit_code)
590
mbligh36768f02008-02-22 18:28:33 +0000591
592 def exit_code(self):
593 return self.proc.poll()
594
595
mblighbb421852008-03-11 22:36:16 +0000596class PidfileException(Exception):
597 """\
598 Raised when there's some unexpected behavior with the pid file.
599 """
600
601
602class PidfileRunMonitor(RunMonitor):
603 def __init__(self, results_dir, cmd=None, nice_level=None,
604 log_file=None):
605 self.results_dir = os.path.abspath(results_dir)
606 self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
607 self.lost_process = False
mbligh90a549d2008-03-25 23:52:34 +0000608 self.start_time = time.time()
mblighbb421852008-03-11 22:36:16 +0000609 if cmd is None:
610 # we're reattaching to an existing pid, so don't call
611 # the superconstructor (we don't want to kick off a new
612 # process)
613 pass
614 else:
mblighd64e5702008-04-04 21:39:28 +0000615 super(PidfileRunMonitor, self).__init__(cmd,
616 nice_level, log_file)
mblighbb421852008-03-11 22:36:16 +0000617
618
619 def get_pid(self):
620 pid, exit_status = self.get_pidfile_info()
621 assert pid is not None
622 return pid
623
624
mbligh90a549d2008-03-25 23:52:34 +0000625 def _check_command_line(self, command_line, spacer=' ',
626 print_error=False):
627 results_dir_arg = spacer.join(('', '-r', self.results_dir, ''))
628 match = results_dir_arg in command_line
629 if print_error and not match:
630 print '%s not found in %s' % (repr(results_dir_arg),
631 repr(command_line))
632 return match
633
634
635 def _check_proc_fs(self, pid):
mblighbb421852008-03-11 22:36:16 +0000636 cmdline_path = os.path.join('/proc', str(pid), 'cmdline')
637 try:
638 cmdline_file = open(cmdline_path, 'r')
639 cmdline = cmdline_file.read().strip()
640 cmdline_file.close()
641 except IOError:
642 return False
643 # /proc/.../cmdline has \x00 separating args
mbligh90a549d2008-03-25 23:52:34 +0000644 return self._check_command_line(cmdline, spacer='\x00',
645 print_error=True)
mblighbb421852008-03-11 22:36:16 +0000646
647
648 def read_pidfile(self):
649 if not os.path.exists(self.pid_file):
650 return None, None
651 file_obj = open(self.pid_file, 'r')
652 lines = file_obj.readlines()
653 file_obj.close()
654 assert 1 <= len(lines) <= 2
655 try:
656 pid = int(lines[0])
657 exit_status = None
658 if len(lines) == 2:
659 exit_status = int(lines[1])
660 except ValueError, exc:
661 raise Exception('Corrupt pid file: ' + str(exc.args))
662
663 return pid, exit_status
664
665
mbligh90a549d2008-03-25 23:52:34 +0000666 def _find_autoserv_proc(self):
667 autoserv_procs = Dispatcher.find_autoservs()
668 for pid, args in autoserv_procs.iteritems():
669 if self._check_command_line(args):
670 return pid, args
671 return None, None
672
673
mblighbb421852008-03-11 22:36:16 +0000674 def get_pidfile_info(self):
675 """\
676 Returns:
677 None, None if autoserv has not yet run
678 pid, None if autoserv is running
679 pid, exit_status if autoserv has completed
680 """
681 if self.lost_process:
682 return self.pid, self.exit_status
683
684 pid, exit_status = self.read_pidfile()
685
mbligh90a549d2008-03-25 23:52:34 +0000686 if pid is None:
687 return self._handle_no_pid()
688
689 if exit_status is None:
690 # double check whether or not autoserv is running
691 proc_running = self._check_proc_fs(pid)
692 if proc_running:
693 return pid, exit_status
694
695 # pid but no process - maybe process *just* exited
mblighbb421852008-03-11 22:36:16 +0000696 pid, exit_status = self.read_pidfile()
mbligh90a549d2008-03-25 23:52:34 +0000697 if exit_status is None:
mblighbb421852008-03-11 22:36:16 +0000698 # autoserv exited without writing an exit code
699 # to the pidfile
700 error = ('autoserv died without writing exit '
701 'code')
702 message = error + '\nPid: %s\nPidfile: %s' % (
703 pid, self.pid_file)
704 print message
705 send_notify_email(error, message)
mbligh90a549d2008-03-25 23:52:34 +0000706 self.on_lost_process(pid)
mblighbb421852008-03-11 22:36:16 +0000707 return self.pid, self.exit_status
708
709 return pid, exit_status
710
711
mbligh90a549d2008-03-25 23:52:34 +0000712 def _handle_no_pid(self):
713 """\
714 Called when no pidfile is found or no pid is in the pidfile.
715 """
716 # is autoserv running?
717 pid, args = self._find_autoserv_proc()
718 if pid is None:
719 # no autoserv process running
720 message = 'No pid found at ' + self.pid_file
721 else:
722 message = ("Process %d (%s) hasn't written pidfile %s" %
723 (pid, args, self.pid_file))
724
725 print message
726 if time.time() - self.start_time > PIDFILE_TIMEOUT:
727 send_notify_email('Process has failed to write pidfile',
728 message)
729 if pid is not None:
730 kill_autoserv(pid)
731 else:
732 pid = 0
733 self.on_lost_process(pid)
734 return self.pid, self.exit_status
735
736 return None, None
737
738
739 def on_lost_process(self, pid):
740 """\
741 Called when autoserv has exited without writing an exit status,
742 or we've timed out waiting for autoserv to write a pid to the
743 pidfile. In either case, we just return failure and the caller
744 should signal some kind of warning.
745
746 pid is unimportant here, as it shouldn't be used by anyone.
747 """
748 self.lost_process = True
749 self.pid = pid
750 self.exit_status = 1
751
752
mblighbb421852008-03-11 22:36:16 +0000753 def exit_code(self):
754 pid, exit_code = self.get_pidfile_info()
mblighbb421852008-03-11 22:36:16 +0000755 return exit_code
756
757
mbligh36768f02008-02-22 18:28:33 +0000758class Agent(object):
mblighd5c95802008-03-05 00:33:46 +0000759 def __init__(self, tasks, queue_entry_ids=[]):
mbligh36768f02008-02-22 18:28:33 +0000760 self.active_task = None
761 self.queue = Queue.Queue(0)
762 self.dispatcher = None
mblighd5c95802008-03-05 00:33:46 +0000763 self.queue_entry_ids = queue_entry_ids
mbligh36768f02008-02-22 18:28:33 +0000764
765 for task in tasks:
766 self.add_task(task)
767
768
769 def add_task(self, task):
770 self.queue.put_nowait(task)
771 task.agent = self
772
773
774 def tick(self):
775 print "agent tick"
776 if self.active_task and not self.active_task.is_done():
777 self.active_task.poll()
778 else:
779 self._next_task();
780
781
782 def _next_task(self):
783 print "agent picking task"
784 if self.active_task:
785 assert self.active_task.is_done()
786
mblighe2586682008-02-29 22:45:46 +0000787 if not self.active_task.success:
788 self.on_task_failure()
789
mbligh36768f02008-02-22 18:28:33 +0000790 self.active_task = None
791 if not self.is_done():
792 self.active_task = self.queue.get_nowait()
793 if self.active_task:
794 self.active_task.start()
795
796
mblighe2586682008-02-29 22:45:46 +0000797 def on_task_failure(self):
mblighe2586682008-02-29 22:45:46 +0000798 self.queue = Queue.Queue(0)
799 for task in self.active_task.failure_tasks:
800 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +0000801
mblighe2586682008-02-29 22:45:46 +0000802
mbligh36768f02008-02-22 18:28:33 +0000803 def is_done(self):
804 return self.active_task == None and self.queue.empty()
805
806
807 def start(self):
808 assert self.dispatcher
809
810 self._next_task()
811
mblighd5c95802008-03-05 00:33:46 +0000812
mbligh36768f02008-02-22 18:28:33 +0000813class AgentTask(object):
mbligh16c722d2008-03-05 00:58:44 +0000814 def __init__(self, cmd, failure_tasks = []):
mbligh36768f02008-02-22 18:28:33 +0000815 self.done = False
816 self.failure_tasks = failure_tasks
817 self.started = False
818 self.cmd = cmd
mblighd5c95802008-03-05 00:33:46 +0000819 self.task = None
mbligh36768f02008-02-22 18:28:33 +0000820 self.agent = None
mblighd5c95802008-03-05 00:33:46 +0000821 self.monitor = None
mblighd64e5702008-04-04 21:39:28 +0000822 self.success = None
mbligh36768f02008-02-22 18:28:33 +0000823
824
825 def poll(self):
826 print "poll"
mblighd5c95802008-03-05 00:33:46 +0000827 if self.monitor:
mbligh36768f02008-02-22 18:28:33 +0000828 self.tick(self.monitor.exit_code())
829 else:
830 self.finished(False)
831
832
833 def tick(self, exit_code):
834 if exit_code==None:
835 return
836# print "exit_code was %d" % exit_code
837 if exit_code == 0:
838 success = True
839 else:
840 success = False
841
842 self.finished(success)
843
844
845 def is_done(self):
846 return self.done
847
848
849 def finished(self, success):
850 self.done = True
851 self.success = success
852 self.epilog()
853
854
855 def prolog(self):
856 pass
857
mblighd64e5702008-04-04 21:39:28 +0000858
859 def create_temp_resultsdir(self, suffix=''):
860 self.temp_results_dir = tempfile.mkdtemp(suffix=suffix)
861
862
863 def cleanup(self):
864 if (hasattr(self, 'temp_results_dir') and
865 os.path.exists(self.temp_results_dir)):
866 shutil.rmtree(self.temp_results_dir)
867
mbligh36768f02008-02-22 18:28:33 +0000868
869 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +0000870 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +0000871
872
873 def start(self):
874 assert self.agent
875
876 if not self.started:
877 self.prolog()
878 self.run()
879
880 self.started = True
881
mblighd64e5702008-04-04 21:39:28 +0000882
mbligh36768f02008-02-22 18:28:33 +0000883 def abort(self):
mblighd5c95802008-03-05 00:33:46 +0000884 if self.monitor:
885 self.monitor.kill()
886 self.done = True
mblighd64e5702008-04-04 21:39:28 +0000887 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +0000888
889
890 def run(self):
891 if self.cmd:
892 print "agent starting monitor"
mbligh36768f02008-02-22 18:28:33 +0000893 log_file = None
894 if hasattr(self, 'host'):
mblighbb421852008-03-11 22:36:16 +0000895 log_file = os.path.join(RESULTS_DIR, 'hosts',
896 self.host.hostname)
897 self.monitor = RunMonitor(
898 self.cmd, nice_level = AUTOSERV_NICE_LEVEL,
899 log_file = log_file)
mbligh36768f02008-02-22 18:28:33 +0000900
901
902class RepairTask(AgentTask):
mbligh16c722d2008-03-05 00:58:44 +0000903 def __init__(self, host, fail_queue_entry=None):
904 """\
905 fail_queue_entry: queue entry to mark failed if this repair
906 fails.
907 """
mblighd64e5702008-04-04 21:39:28 +0000908 self.create_temp_resultsdir('.repair')
909 cmd = [_autoserv_path , '-R', '-m', host.hostname,
910 '-r', self.temp_results_dir]
mbligh36768f02008-02-22 18:28:33 +0000911 self.host = host
mbligh16c722d2008-03-05 00:58:44 +0000912 self.fail_queue_entry = fail_queue_entry
mblighd64e5702008-04-04 21:39:28 +0000913 super(RepairTask, self).__init__(cmd)
mblighe2586682008-02-29 22:45:46 +0000914
mbligh36768f02008-02-22 18:28:33 +0000915
916 def prolog(self):
917 print "repair_task starting"
918 self.host.set_status('Repairing')
919
920
921 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +0000922 super(RepairTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +0000923 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000924 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +0000925 else:
mbligh16c722d2008-03-05 00:58:44 +0000926 self.host.set_status('Repair Failed')
927 if self.fail_queue_entry:
928 self.fail_queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +0000929
930
931class VerifyTask(AgentTask):
932 def __init__(self, queue_entry=None, host=None):
933 assert bool(queue_entry) != bool(host)
934
935 self.host = host or queue_entry.host
936 self.queue_entry = queue_entry
937
mblighd64e5702008-04-04 21:39:28 +0000938 self.create_temp_resultsdir('.verify')
mbligh48c10a52008-02-29 22:46:38 +0000939 cmd = [_autoserv_path,'-v','-m',self.host.hostname,
mbligh36768f02008-02-22 18:28:33 +0000940 '-r', self.temp_results_dir]
941
mbligh16c722d2008-03-05 00:58:44 +0000942 fail_queue_entry = None
943 if queue_entry and not queue_entry.meta_host:
944 fail_queue_entry = queue_entry
945 failure_tasks = [RepairTask(self.host, fail_queue_entry)]
mblighe2586682008-02-29 22:45:46 +0000946
mblighd64e5702008-04-04 21:39:28 +0000947 super(VerifyTask, self).__init__(cmd,
948 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +0000949
950
mbligh36768f02008-02-22 18:28:33 +0000951 def prolog(self):
952 print "starting verify on %s" % (self.host.hostname)
953 if self.queue_entry:
954 self.queue_entry.set_status('Verifying')
mblighdffd6372008-02-29 22:47:33 +0000955 self.queue_entry.clear_results_dir(
956 self.queue_entry.verify_results_dir())
mbligh36768f02008-02-22 18:28:33 +0000957 self.host.set_status('Verifying')
958
959
mblighd64e5702008-04-04 21:39:28 +0000960 def cleanup(self):
961 if not os.path.exists(self.temp_results_dir):
962 return
mbligh36768f02008-02-22 18:28:33 +0000963 if self.queue_entry and (self.success or
mblighd64e5702008-04-04 21:39:28 +0000964 not self.queue_entry.meta_host):
mbligh36768f02008-02-22 18:28:33 +0000965 self.move_results()
mblighd64e5702008-04-04 21:39:28 +0000966 super(VerifyTask, self).cleanup()
967
968
969 def epilog(self):
970 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +0000971
972 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000973 self.host.set_status('Ready')
974 elif self.queue_entry:
mblighdffd6372008-02-29 22:47:33 +0000975 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +0000976
977
978 def move_results(self):
979 assert self.queue_entry is not None
mblighe2586682008-02-29 22:45:46 +0000980 target_dir = self.queue_entry.verify_results_dir()
mbligh36768f02008-02-22 18:28:33 +0000981 if not os.path.exists(target_dir):
982 os.makedirs(target_dir)
983 files = os.listdir(self.temp_results_dir)
984 for filename in files:
mblighbb421852008-03-11 22:36:16 +0000985 if filename == AUTOSERV_PID_FILE:
986 continue
mblighe2586682008-02-29 22:45:46 +0000987 self.force_move(os.path.join(self.temp_results_dir,
988 filename),
989 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +0000990
991
mblighe2586682008-02-29 22:45:46 +0000992 @staticmethod
993 def force_move(source, dest):
994 """\
995 Replacement for shutil.move() that will delete the destination
996 if it exists, even if it's a directory.
997 """
998 if os.path.exists(dest):
999 print ('Warning: removing existing destination file ' +
1000 dest)
1001 remove_file_or_dir(dest)
1002 shutil.move(source, dest)
1003
1004
mblighdffd6372008-02-29 22:47:33 +00001005class VerifySynchronousTask(VerifyTask):
1006 def __init__(self, queue_entry):
mblighd64e5702008-04-04 21:39:28 +00001007 super(VerifySynchronousTask, self).__init__(
1008 queue_entry = queue_entry)
mblighdffd6372008-02-29 22:47:33 +00001009
1010
mbligh16c722d2008-03-05 00:58:44 +00001011 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001012 super(VerifySynchronousTask, self).epilog()
mbligh16c722d2008-03-05 00:58:44 +00001013 if self.success:
1014 if self.queue_entry.job.num_complete() > 0:
1015 # some other entry failed verify, and we've
1016 # already been marked as stopped
1017 return
mblighdffd6372008-02-29 22:47:33 +00001018
mbligh16c722d2008-03-05 00:58:44 +00001019 self.queue_entry.set_status('Pending')
1020 job = self.queue_entry.job
1021 if job.is_ready():
1022 agent = job.run(self.queue_entry)
1023 self.agent.dispatcher.add_agent(agent)
mblighe2586682008-02-29 22:45:46 +00001024
mbligh36768f02008-02-22 18:28:33 +00001025class QueueTask(AgentTask):
1026 def __init__(self, job, queue_entries, cmd):
mblighd64e5702008-04-04 21:39:28 +00001027 super(QueueTask, self).__init__(cmd)
mbligh36768f02008-02-22 18:28:33 +00001028 self.job = job
1029 self.queue_entries = queue_entries
1030
1031
mblighbb421852008-03-11 22:36:16 +00001032 @staticmethod
1033 def _write_keyval(results_dir, field, value):
1034 key_path = os.path.join(results_dir, 'keyval')
mbligh36768f02008-02-22 18:28:33 +00001035 keyval_file = open(key_path, 'a')
1036 print >> keyval_file, '%s=%d' % (field, value)
1037 keyval_file.close()
1038
1039
mblighbb421852008-03-11 22:36:16 +00001040 def results_dir(self):
1041 return self.queue_entries[0].results_dir()
1042
1043
1044 def run(self):
1045 """\
1046 Override AgentTask.run() so we can use a PidfileRunMonitor.
1047 """
1048 self.monitor = PidfileRunMonitor(self.results_dir(),
1049 cmd=self.cmd,
1050 nice_level=AUTOSERV_NICE_LEVEL)
1051
1052
mbligh36768f02008-02-22 18:28:33 +00001053 def prolog(self):
mblighe2586682008-02-29 22:45:46 +00001054 # write some job timestamps into the job keyval file
1055 queued = time.mktime(self.job.created_on.timetuple())
1056 started = time.time()
mblighbb421852008-03-11 22:36:16 +00001057 self._write_keyval(self.results_dir(), "job_queued", queued)
1058 self._write_keyval(self.results_dir(), "job_started", started)
mbligh36768f02008-02-22 18:28:33 +00001059 for queue_entry in self.queue_entries:
1060 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
1061 queue_entry.set_status('Running')
1062 queue_entry.host.set_status('Running')
mblighe2586682008-02-29 22:45:46 +00001063 if (not self.job.is_synchronous() and
1064 self.job.num_machines() > 1):
1065 assert len(self.queue_entries) == 1
1066 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001067
1068
1069 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001070 super(QueueTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001071 if self.success:
1072 status = 'Completed'
1073 else:
1074 status = 'Failed'
1075
mblighe2586682008-02-29 22:45:46 +00001076 # write another timestamp into the job keyval file
1077 finished = time.time()
mblighbb421852008-03-11 22:36:16 +00001078 self._write_keyval(self.results_dir(), "job_finished", finished)
mbligh36768f02008-02-22 18:28:33 +00001079 for queue_entry in self.queue_entries:
1080 queue_entry.set_status(status)
1081 queue_entry.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001082
1083 if self.job.is_synchronous() or self.job.num_machines()==1:
1084 if self.job.is_finished():
1085 parse_results(self.job.results_dir())
1086 else:
1087 for queue_entry in self.queue_entries:
mblighbb421852008-03-11 22:36:16 +00001088 parse_results(queue_entry.results_dir(),
1089 flags='-l 2')
1090
mbligh36768f02008-02-22 18:28:33 +00001091 print "queue_task finished with %s/%s" % (status, self.success)
1092
1093
mblighbb421852008-03-11 22:36:16 +00001094class RecoveryQueueTask(QueueTask):
1095 def __init__(self, job, queue_entries, run_monitor):
mblighd64e5702008-04-04 21:39:28 +00001096 super(RecoveryQueueTask, self).__init__(job,
1097 queue_entries, cmd=None)
mblighbb421852008-03-11 22:36:16 +00001098 self.run_monitor = run_monitor
1099
1100
1101 def run(self):
1102 self.monitor = self.run_monitor
1103
1104
1105 def prolog(self):
1106 # recovering an existing process - don't do prolog
1107 pass
1108
1109
mbligh36768f02008-02-22 18:28:33 +00001110class RebootTask(AgentTask):
mblighd5c95802008-03-05 00:33:46 +00001111 def __init__(self, host):
1112 global _autoserv_path
1113
1114 # Current implementation of autoserv requires control file
1115 # to be passed on reboot action request. TODO: remove when no
1116 # longer appropriate.
mblighd64e5702008-04-04 21:39:28 +00001117 self.create_temp_resultsdir('.reboot')
mblighd5c95802008-03-05 00:33:46 +00001118 self.cmd = [_autoserv_path, '-b', '-m', host.hostname,
mblighd64e5702008-04-04 21:39:28 +00001119 '-r', self.temp_results_dir, '/dev/null']
mbligh36768f02008-02-22 18:28:33 +00001120 self.host = host
mblighd64e5702008-04-04 21:39:28 +00001121 super(RebootTask, self).__init__(self.cmd,
mbligh16c722d2008-03-05 00:58:44 +00001122 failure_tasks=[RepairTask(host)])
1123
mblighd5c95802008-03-05 00:33:46 +00001124
1125 def prolog(self):
1126 print "starting reboot task for host: %s" % self.host.hostname
1127 self.host.set_status("Rebooting")
1128
mblighd5c95802008-03-05 00:33:46 +00001129
1130class AbortTask(AgentTask):
1131 def __init__(self, queue_entry, agents_to_abort):
1132 self.queue_entry = queue_entry
1133 self.agents_to_abort = agents_to_abort
1134 for agent in agents_to_abort:
1135 agent.dispatcher.remove_agent(agent)
mblighd64e5702008-04-04 21:39:28 +00001136 super(AbortTask, self).__init__('')
mbligh36768f02008-02-22 18:28:33 +00001137
1138
mblighd5c95802008-03-05 00:33:46 +00001139 def prolog(self):
1140 print "starting abort on host %s, job %s" % (
1141 self.queue_entry.host_id, self.queue_entry.job_id)
1142 self.queue_entry.set_status('Aborting')
1143
mbligh36768f02008-02-22 18:28:33 +00001144
mblighd5c95802008-03-05 00:33:46 +00001145 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001146 super(AbortTask, self).epilog()
mblighd5c95802008-03-05 00:33:46 +00001147 self.queue_entry.set_status('Aborted')
1148 self.success = True
mbligh36768f02008-02-22 18:28:33 +00001149
mblighd64e5702008-04-04 21:39:28 +00001150
mbligh36768f02008-02-22 18:28:33 +00001151 def run(self):
mblighd5c95802008-03-05 00:33:46 +00001152 for agent in self.agents_to_abort:
1153 if (agent.active_task):
1154 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001155
1156
1157class DBObject(object):
mblighe2586682008-02-29 22:45:46 +00001158 def __init__(self, fields, id=None, row=None, new_record=False):
1159 assert (bool(id) != bool(row)) and fields
mbligh36768f02008-02-22 18:28:33 +00001160
mblighe2586682008-02-29 22:45:46 +00001161 self.__table = self._get_table()
mbligh36768f02008-02-22 18:28:33 +00001162 self.__fields = fields
1163
1164 self.__new_record = new_record
1165
1166 if row is None:
1167 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
mbligh6f8bab42008-02-29 22:45:14 +00001168 rows = _db.execute(sql, (id,))
1169 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001170 raise "row not found (table=%s, id=%s)" % \
1171 (self.__table, id)
mbligh6f8bab42008-02-29 22:45:14 +00001172 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001173
mblighe2586682008-02-29 22:45:46 +00001174 assert len(row)==len(fields), (
1175 "table = %s, row = %s/%d, fields = %s/%d" % (
1176 self.__table, row, len(row), fields, len(fields)))
mbligh36768f02008-02-22 18:28:33 +00001177
1178 self.__valid_fields = {}
1179 for i,value in enumerate(row):
1180 self.__dict__[fields[i]] = value
1181 self.__valid_fields[fields[i]] = True
1182
1183 del self.__valid_fields['id']
1184
mblighe2586682008-02-29 22:45:46 +00001185
1186 @classmethod
1187 def _get_table(cls):
1188 raise NotImplementedError('Subclasses must override this')
1189
1190
mbligh36768f02008-02-22 18:28:33 +00001191 def count(self, where, table = None):
1192 if not table:
1193 table = self.__table
mbligh4314a712008-02-29 22:44:30 +00001194
mbligh6f8bab42008-02-29 22:45:14 +00001195 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001196 SELECT count(*) FROM %s
1197 WHERE %s
1198 """ % (table, where))
mbligh36768f02008-02-22 18:28:33 +00001199
mbligh6f8bab42008-02-29 22:45:14 +00001200 assert len(rows) == 1
1201
1202 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001203
1204
1205 def num_cols(self):
1206 return len(self.__fields)
1207
1208
1209 def update_field(self, field, value):
1210 assert self.__valid_fields[field]
1211
1212 if self.__dict__[field] == value:
1213 return
1214
1215 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
1216 (self.__table, field)
mbligh6f8bab42008-02-29 22:45:14 +00001217 _db.execute(query, (value, self.id))
mbligh36768f02008-02-22 18:28:33 +00001218
1219 self.__dict__[field] = value
1220
1221
1222 def save(self):
1223 if self.__new_record:
1224 keys = self.__fields[1:] # avoid id
1225 columns = ','.join([str(key) for key in keys])
1226 values = ['"%s"' % self.__dict__[key] for key in keys]
1227 values = ','.join(values)
1228 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1229 (self.__table, columns, values)
mbligh6f8bab42008-02-29 22:45:14 +00001230 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001231
1232
mblighe2586682008-02-29 22:45:46 +00001233 def delete(self):
1234 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1235 _db.execute(query, (self.id,))
1236
1237
1238 @classmethod
mbligh62ba2ed2008-04-30 17:09:25 +00001239 def fetch(cls, where, params=()):
mblighe2586682008-02-29 22:45:46 +00001240 rows = _db.execute(
mbligh62ba2ed2008-04-30 17:09:25 +00001241 'SELECT * FROM %s WHERE %s' % (cls._get_table(), where),
1242 params)
mblighe2586682008-02-29 22:45:46 +00001243 for row in rows:
1244 yield cls(row=row)
1245
mbligh36768f02008-02-22 18:28:33 +00001246
1247class IneligibleHostQueue(DBObject):
1248 def __init__(self, id=None, row=None, new_record=None):
1249 fields = ['id', 'job_id', 'host_id']
mblighd64e5702008-04-04 21:39:28 +00001250 super(IneligibleHostQueue, self).__init__(fields, id=id,
1251 row=row, new_record=new_record)
mblighe2586682008-02-29 22:45:46 +00001252
1253
1254 @classmethod
1255 def _get_table(cls):
1256 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001257
1258
1259class Host(DBObject):
1260 def __init__(self, id=None, row=None):
mbligh5244cbb2008-04-24 20:39:52 +00001261 fields = ['id', 'hostname', 'locked', 'synch_id','status',
1262 'invalid']
mblighd64e5702008-04-04 21:39:28 +00001263 super(Host, self).__init__(fields, id=id, row=row)
mblighe2586682008-02-29 22:45:46 +00001264
1265
1266 @classmethod
1267 def _get_table(cls):
1268 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001269
1270
1271 def current_task(self):
mbligh6f8bab42008-02-29 22:45:14 +00001272 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001273 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1274 """, (self.id,))
1275
mbligh6f8bab42008-02-29 22:45:14 +00001276 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001277 return None
1278 else:
mbligh6f8bab42008-02-29 22:45:14 +00001279 assert len(rows) == 1
1280 results = rows[0];
mbligh36768f02008-02-22 18:28:33 +00001281# print "current = %s" % results
1282 return HostQueueEntry(row=results)
1283
1284
1285 def next_queue_entries(self):
1286 if self.locked:
1287 print "%s locked, not queuing" % self.hostname
1288 return None
1289# print "%s/%s looking for work" % (self.hostname, self.platform_id)
mbligh6f8bab42008-02-29 22:45:14 +00001290 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001291 SELECT * FROM host_queue_entries
1292 WHERE ((host_id=%s) OR (meta_host IS NOT null AND
1293 (meta_host IN (
1294 SELECT label_id FROM hosts_labels WHERE host_id=%s
1295 )
1296 )
1297 AND job_id NOT IN (
1298 SELECT job_id FROM ineligible_host_queues
1299 WHERE host_id=%s
1300 )))
1301 AND NOT complete AND NOT active
1302 ORDER BY priority DESC, meta_host, id
1303 LIMIT 1
1304 """, (self.id,self.id, self.id))
1305
mbligh6f8bab42008-02-29 22:45:14 +00001306 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001307 return None
1308 else:
mbligh6f8bab42008-02-29 22:45:14 +00001309 return [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001310
1311 def yield_work(self):
1312 print "%s yielding work" % self.hostname
1313 if self.current_task():
1314 self.current_task().requeue()
1315
1316 def set_status(self,status):
mblighbb421852008-03-11 22:36:16 +00001317 print '%s -> %s' % (self.hostname, status)
mbligh36768f02008-02-22 18:28:33 +00001318 self.update_field('status',status)
1319
1320
1321class HostQueueEntry(DBObject):
1322 def __init__(self, id=None, row=None):
1323 assert id or row
1324 fields = ['id', 'job_id', 'host_id', 'priority', 'status',
1325 'meta_host', 'active', 'complete']
mblighd64e5702008-04-04 21:39:28 +00001326 super(HostQueueEntry, self).__init__(fields, id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001327 self.job = Job(self.job_id)
1328
1329 if self.host_id:
1330 self.host = Host(self.host_id)
1331 else:
1332 self.host = None
1333
1334 self.queue_log_path = os.path.join(self.job.results_dir(),
1335 'queue.log.' + str(self.id))
1336
1337
mblighe2586682008-02-29 22:45:46 +00001338 @classmethod
1339 def _get_table(cls):
1340 return 'host_queue_entries'
1341
1342
mbligh36768f02008-02-22 18:28:33 +00001343 def set_host(self, host):
1344 if host:
1345 self.queue_log_record('Assigning host ' + host.hostname)
1346 self.update_field('host_id', host.id)
1347 self.update_field('active', True)
mblighe2586682008-02-29 22:45:46 +00001348 self.block_host(host.id)
mbligh36768f02008-02-22 18:28:33 +00001349 else:
1350 self.queue_log_record('Releasing host')
mblighe2586682008-02-29 22:45:46 +00001351 self.unblock_host(self.host.id)
mbligh36768f02008-02-22 18:28:33 +00001352 self.update_field('host_id', None)
1353
1354 self.host = host
1355
1356
1357 def get_host(self):
mblighe2586682008-02-29 22:45:46 +00001358 return self.host
mbligh36768f02008-02-22 18:28:33 +00001359
1360
1361 def queue_log_record(self, log_line):
mblighe2586682008-02-29 22:45:46 +00001362 now = str(datetime.datetime.now())
mbligh36768f02008-02-22 18:28:33 +00001363 queue_log = open(self.queue_log_path, 'a', 0)
mblighe2586682008-02-29 22:45:46 +00001364 queue_log.write(now + ' ' + log_line + '\n')
mbligh36768f02008-02-22 18:28:33 +00001365 queue_log.close()
1366
1367
mblighe2586682008-02-29 22:45:46 +00001368 def block_host(self, host_id):
1369 print "creating block %s/%s" % (self.job.id, host_id)
1370 row = [0, self.job.id, host_id]
1371 block = IneligibleHostQueue(row=row, new_record=True)
1372 block.save()
1373
1374
1375 def unblock_host(self, host_id):
1376 print "removing block %s/%s" % (self.job.id, host_id)
1377 blocks = list(IneligibleHostQueue.fetch(
1378 'job_id=%d and host_id=%d' % (self.job.id, host_id)))
1379 assert len(blocks) == 1
1380 blocks[0].delete()
1381
1382
mbligh36768f02008-02-22 18:28:33 +00001383 def results_dir(self):
mblighe2586682008-02-29 22:45:46 +00001384 if self.job.is_synchronous() or self.job.num_machines() == 1:
1385 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001386 else:
1387 assert self.host
mblighe2586682008-02-29 22:45:46 +00001388 return os.path.join(self.job.job_dir,
1389 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +00001390
mblighe2586682008-02-29 22:45:46 +00001391
1392 def verify_results_dir(self):
1393 if self.job.is_synchronous() or self.job.num_machines() > 1:
1394 assert self.host
1395 return os.path.join(self.job.job_dir,
1396 self.host.hostname)
1397 else:
1398 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001399
1400
1401 def set_status(self, status):
1402 self.update_field('status', status)
1403 if self.host:
1404 hostname = self.host.hostname
1405 else:
1406 hostname = 'no host'
1407 print "%s/%d status -> %s" % (hostname, self.id, self.status)
1408 if status in ['Queued']:
1409 self.update_field('complete', False)
1410 self.update_field('active', False)
1411
mblighd5c95802008-03-05 00:33:46 +00001412 if status in ['Pending', 'Running', 'Verifying', 'Starting',
1413 'Abort', 'Aborting']:
mbligh36768f02008-02-22 18:28:33 +00001414 self.update_field('complete', False)
1415 self.update_field('active', True)
1416
mblighd5c95802008-03-05 00:33:46 +00001417 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
mbligh36768f02008-02-22 18:28:33 +00001418 self.update_field('complete', True)
1419 self.update_field('active', False)
1420
1421
1422 def run(self,assigned_host=None):
1423 if self.meta_host:
1424 assert assigned_host
mblighe2586682008-02-29 22:45:46 +00001425 # ensure results dir exists for the queue log
mbligh36768f02008-02-22 18:28:33 +00001426 self.job.create_results_dir()
1427 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001428
mbligh36768f02008-02-22 18:28:33 +00001429 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1430 self.meta_host, self.host.hostname, self.status)
1431
1432 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001433
mbligh36768f02008-02-22 18:28:33 +00001434 def requeue(self):
1435 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001436
mbligh36768f02008-02-22 18:28:33 +00001437 if self.meta_host:
1438 self.set_host(None)
1439
1440
mblighe2586682008-02-29 22:45:46 +00001441 def handle_host_failure(self):
1442 """\
1443 Called when this queue entry's host has failed verification and
1444 repair.
1445 """
mblighdffd6372008-02-29 22:47:33 +00001446 assert not self.meta_host
1447 self.set_status('Failed')
1448 if self.job.is_synchronous():
1449 self.job.stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00001450
1451
mbligh90a549d2008-03-25 23:52:34 +00001452 def clear_results_dir(self, results_dir=None, dont_delete_files=False):
mblighe2586682008-02-29 22:45:46 +00001453 results_dir = results_dir or self.results_dir()
1454 if not os.path.exists(results_dir):
1455 return
mbligh90a549d2008-03-25 23:52:34 +00001456 if dont_delete_files:
1457 temp_dir = tempfile.mkdtemp(suffix='.clear_results')
1458 print 'Moving results from %s to %s' % (results_dir,
1459 temp_dir)
mblighe2586682008-02-29 22:45:46 +00001460 for filename in os.listdir(results_dir):
mblighe2586682008-02-29 22:45:46 +00001461 path = os.path.join(results_dir, filename)
mbligh90a549d2008-03-25 23:52:34 +00001462 if dont_delete_files:
1463 shutil.move(path,
1464 os.path.join(temp_dir, filename))
1465 else:
1466 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00001467
1468
1469class Job(DBObject):
1470 def __init__(self, id=None, row=None):
1471 assert id or row
mblighd64e5702008-04-04 21:39:28 +00001472 super(Job, self).__init__(
mblighe2586682008-02-29 22:45:46 +00001473 ['id','owner','name','priority',
1474 'control_file','control_type','created_on',
1475 'synch_type', 'synch_count','synchronizing'],
1476 id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001477
mblighe2586682008-02-29 22:45:46 +00001478 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
1479 self.owner))
1480
1481
1482 @classmethod
1483 def _get_table(cls):
1484 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001485
1486
1487 def is_server_job(self):
1488 return self.control_type != 2
1489
1490
1491 def get_host_queue_entries(self):
mbligh6f8bab42008-02-29 22:45:14 +00001492 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001493 SELECT * FROM host_queue_entries
1494 WHERE job_id= %s
1495 """, (self.id,))
mbligh6f8bab42008-02-29 22:45:14 +00001496 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001497
1498 assert len(entries)>0
1499
1500 return entries
1501
1502
1503 def set_status(self, status, update_queues=False):
1504 self.update_field('status',status)
1505
1506 if update_queues:
1507 for queue_entry in self.get_host_queue_entries():
1508 queue_entry.set_status(status)
1509
1510
1511 def is_synchronous(self):
1512 return self.synch_type == 2
1513
1514
1515 def is_ready(self):
1516 if not self.is_synchronous():
1517 return True
1518 sql = "job_id=%s AND status='Pending'" % self.id
1519 count = self.count(sql, table='host_queue_entries')
1520 return (count == self.synch_count)
1521
1522
1523 def ready_to_synchronize(self):
1524 # heuristic
1525 queue_entries = self.get_host_queue_entries()
1526 count = 0
1527 for queue_entry in queue_entries:
1528 if queue_entry.status == 'Pending':
1529 count += 1
1530
1531 return (count/self.synch_count >= 0.5)
1532
1533
1534 def start_synchronizing(self):
1535 self.update_field('synchronizing', True)
1536
1537
1538 def results_dir(self):
1539 return self.job_dir
1540
1541 def num_machines(self, clause = None):
1542 sql = "job_id=%s" % self.id
1543 if clause:
1544 sql += " AND (%s)" % clause
1545 return self.count(sql, table='host_queue_entries')
1546
1547
1548 def num_queued(self):
1549 return self.num_machines('not complete')
1550
1551
1552 def num_active(self):
1553 return self.num_machines('active')
1554
1555
1556 def num_complete(self):
1557 return self.num_machines('complete')
1558
1559
1560 def is_finished(self):
1561 left = self.num_queued()
1562 print "%s: %s machines left" % (self.name, left)
1563 return left==0
1564
1565 def stop_synchronizing(self):
1566 self.update_field('synchronizing', False)
1567 self.set_status('Queued', update_queues = False)
1568
1569
mblighe2586682008-02-29 22:45:46 +00001570 def stop_all_entries(self):
1571 for child_entry in self.get_host_queue_entries():
1572 if not child_entry.complete:
1573 child_entry.set_status('Stopped')
1574
1575
1576 def write_to_machines_file(self, queue_entry):
1577 hostname = queue_entry.get_host().hostname
1578 print "writing %s to job %s machines file" % (hostname, self.id)
1579 file_path = os.path.join(self.job_dir, '.machines')
1580 mf = open(file_path, 'a')
1581 mf.write("%s\n" % queue_entry.get_host().hostname)
1582 mf.close()
mbligh36768f02008-02-22 18:28:33 +00001583
1584
1585 def create_results_dir(self, queue_entry=None):
1586 print "create: active: %s complete %s" % (self.num_active(),
1587 self.num_complete())
1588
1589 if not os.path.exists(self.job_dir):
1590 os.makedirs(self.job_dir)
1591
1592 if queue_entry:
1593 return queue_entry.results_dir()
1594 return self.job_dir
1595
1596
1597 def run(self, queue_entry):
1598 results_dir = self.create_results_dir(queue_entry)
1599
1600 if self.is_synchronous():
1601 if not self.is_ready():
mblighd5c95802008-03-05 00:33:46 +00001602 return Agent([VerifySynchronousTask(
1603 queue_entry = queue_entry)],
1604 [queue_entry.id])
mbligh36768f02008-02-22 18:28:33 +00001605
1606 queue_entry.set_status('Starting')
1607
1608 ctrl = open(os.tmpnam(), 'w')
1609 if self.control_file:
1610 ctrl.write(self.control_file)
1611 else:
1612 ctrl.write("")
1613 ctrl.flush()
1614
1615 if self.is_synchronous():
mbligh36768f02008-02-22 18:28:33 +00001616 queue_entries = self.get_host_queue_entries()
1617 else:
1618 assert queue_entry
mbligh36768f02008-02-22 18:28:33 +00001619 queue_entries = [queue_entry]
mblighe2586682008-02-29 22:45:46 +00001620 hostnames = ','.join([entry.get_host().hostname
1621 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00001622
mbligh6437ff52008-04-17 15:24:38 +00001623 # determine the job tag
1624 if self.is_synchronous() or self.num_machines() == 1:
1625 job_name = "%s-%s" % (self.id, self.owner)
1626 else:
1627 job_name = "%s-%s/%s" % (self.id, self.owner,
1628 hostnames)
1629
1630 params = [_autoserv_path, '-P', job_name, '-p', '-n',
mblighbb421852008-03-11 22:36:16 +00001631 '-r', os.path.abspath(results_dir),
1632 '-b', '-u', self.owner, '-l', self.name,
1633 '-m', hostnames, ctrl.name]
mbligh36768f02008-02-22 18:28:33 +00001634
1635 if not self.is_server_job():
1636 params.append('-c')
1637
1638 tasks = []
1639 if not self.is_synchronous():
1640 tasks.append(VerifyTask(queue_entry))
mblighe2586682008-02-29 22:45:46 +00001641
1642 tasks.append(QueueTask(job = self,
1643 queue_entries = queue_entries,
1644 cmd = params))
mbligh36768f02008-02-22 18:28:33 +00001645
mblighd5c95802008-03-05 00:33:46 +00001646 ids = []
1647 for entry in queue_entries:
1648 ids.append(entry.id)
1649
1650 agent = Agent(tasks, ids)
mbligh36768f02008-02-22 18:28:33 +00001651
1652 return agent
1653
1654
1655if __name__ == '__main__':
1656 main()