blob: 5dee8325cea83884603f3025c12eee8dbfa3acc5 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
mblighcadb3532008-04-15 17:46:26 +00009import optparse, signal, smtplib, socket, datetime, stat, pwd, errno
mblighb090f142008-02-27 21:33:46 +000010from common import global_config
11
mbligh36768f02008-02-22 18:28:33 +000012RESULTS_DIR = '.'
13AUTOSERV_NICE_LEVEL = 10
14
15AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
16
17if os.environ.has_key('AUTOTEST_DIR'):
18 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
19AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
20AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
21
22if AUTOTEST_SERVER_DIR not in sys.path:
23 sys.path.insert(0, AUTOTEST_SERVER_DIR)
24
mblighbb421852008-03-11 22:36:16 +000025AUTOSERV_PID_FILE = '.autoserv_execute'
mbligh90a549d2008-03-25 23:52:34 +000026# how long to wait for autoserv to write a pidfile
27PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000028
mbligh6f8bab42008-02-29 22:45:14 +000029_db = None
mbligh36768f02008-02-22 18:28:33 +000030_shutdown = False
31_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000032_autoserv_path = 'autoserv'
33_testing_mode = False
mbligh36768f02008-02-22 18:28:33 +000034
35
36def main():
37 usage = 'usage: %prog [options] results_dir'
38
39 parser = optparse.OptionParser(usage)
mblighbb421852008-03-11 22:36:16 +000040 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
mbligh36768f02008-02-22 18:28:33 +000041 action='store_true')
42 parser.add_option('--logfile', help='Set a log file that all stdout ' +
43 'should be redirected to. Stderr will go to this ' +
44 'file + ".err"')
mbligh4314a712008-02-29 22:44:30 +000045 parser.add_option('--test', help='Indicate that scheduler is under ' +
46 'test and should use dummy autoserv and no parsing',
47 action='store_true')
mbligh36768f02008-02-22 18:28:33 +000048 (options, args) = parser.parse_args()
49 if len(args) != 1:
50 parser.print_usage()
51 return
52
53 global RESULTS_DIR
54 RESULTS_DIR = args[0]
55
mblighe44a46d2008-04-30 17:47:34 +000056 # read in notify_email from global_config
57 c = global_config.global_config
mbligh36768f02008-02-22 18:28:33 +000058 global _notify_email
mblighe44a46d2008-04-30 17:47:34 +000059 val = c.get_config_value("SCHEDULER", "notify_email")
60 if val != "":
61 _notify_email = val
mbligh4314a712008-02-29 22:44:30 +000062
63 if options.test:
64 global _autoserv_path
65 _autoserv_path = 'autoserv_dummy'
66 global _testing_mode
67 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000068
69 init(options.logfile)
mblighbb421852008-03-11 22:36:16 +000070 dispatcher = Dispatcher()
71 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
mbligh36768f02008-02-22 18:28:33 +000072
73 try:
74 while not _shutdown:
75 dispatcher.tick()
76 time.sleep(20)
mbligh36768f02008-02-22 18:28:33 +000077 except:
78 log_stacktrace("Uncaught exception; terminating monitor_db")
79
mbligh6f8bab42008-02-29 22:45:14 +000080 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +000081
82
83def handle_sigint(signum, frame):
84 global _shutdown
85 _shutdown = True
86 print "Shutdown request received."
87
88
89def init(logfile):
90 if logfile:
91 enable_logging(logfile)
92 print "%s> dispatcher starting" % time.strftime("%X %x")
93 print "My PID is %d" % os.getpid()
94
95 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
mbligh6f8bab42008-02-29 22:45:14 +000096 global _db
97 _db = DatabaseConn()
mbligh36768f02008-02-22 18:28:33 +000098
99 print "Setting signal handler"
100 signal.signal(signal.SIGINT, handle_sigint)
101
102 print "Connected! Running..."
103
104
105def enable_logging(logfile):
106 out_file = logfile
107 err_file = "%s.err" % logfile
108 print "Enabling logging to %s (%s)" % (out_file, err_file)
109 out_fd = open(out_file, "a", buffering=0)
110 err_fd = open(err_file, "a", buffering=0)
111
112 os.dup2(out_fd.fileno(), sys.stdout.fileno())
113 os.dup2(err_fd.fileno(), sys.stderr.fileno())
114
115 sys.stdout = out_fd
116 sys.stderr = err_fd
117
118
119def idle_hosts():
mbligh6f8bab42008-02-29 22:45:14 +0000120 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000121 SELECT * FROM hosts h WHERE
122 id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
123 (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
124 OR
125 (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
126 INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
127 )
mbligh5244cbb2008-04-24 20:39:52 +0000128 AND locked=false AND invalid=false
129 AND (h.status IS null OR h.status='Ready') """)
mbligh6f8bab42008-02-29 22:45:14 +0000130 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000131 return hosts
132
mblighd5c95802008-03-05 00:33:46 +0000133def queue_entries_to_abort():
134 rows = _db.execute("""
135 SELECT * FROM host_queue_entries WHERE status='Abort';
136 """)
137 qe = [HostQueueEntry(row=i) for i in rows]
138 return qe
mbligh36768f02008-02-22 18:28:33 +0000139
mblighe2586682008-02-29 22:45:46 +0000140def remove_file_or_dir(path):
141 if stat.S_ISDIR(os.stat(path).st_mode):
142 # directory
143 shutil.rmtree(path)
144 else:
145 # file
146 os.remove(path)
147
148
mbligh6f8bab42008-02-29 22:45:14 +0000149class DatabaseConn:
150 def __init__(self):
151 self.reconnect_wait = 20
152 self.conn = None
153 self.cur = None
mbligh36768f02008-02-22 18:28:33 +0000154
mbligh4eb2df22008-03-13 15:39:29 +0000155 import MySQLdb.converters
156 self.convert_dict = MySQLdb.converters.conversions
157 self.convert_dict.setdefault(bool, self.convert_boolean)
158
mbligh6f8bab42008-02-29 22:45:14 +0000159 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000160
161
mbligh4eb2df22008-03-13 15:39:29 +0000162 @staticmethod
163 def convert_boolean(boolean, conversion_dict):
164 'Convert booleans to integer strings'
165 return str(int(boolean))
166
167
mbligh6f8bab42008-02-29 22:45:14 +0000168 def connect(self):
169 self.disconnect()
170
171 # get global config and parse for info
172 c = global_config.global_config
173 dbase = "AUTOTEST_WEB"
mbligh104e9ce2008-03-11 22:01:44 +0000174 DB_HOST = c.get_config_value(dbase, "host")
175 DB_SCHEMA = c.get_config_value(dbase, "database")
mbligh6f8bab42008-02-29 22:45:14 +0000176
177 global _testing_mode
178 if _testing_mode:
179 DB_SCHEMA = 'stresstest_autotest_web'
180
mbligh104e9ce2008-03-11 22:01:44 +0000181 DB_USER = c.get_config_value(dbase, "user")
182 DB_PASS = c.get_config_value(dbase, "password")
mbligh6f8bab42008-02-29 22:45:14 +0000183
184 while not self.conn:
185 try:
mbligh4eb2df22008-03-13 15:39:29 +0000186 self.conn = MySQLdb.connect(
187 host=DB_HOST, user=DB_USER, passwd=DB_PASS,
188 db=DB_SCHEMA, conv=self.convert_dict)
mbligh6f8bab42008-02-29 22:45:14 +0000189
190 self.conn.autocommit(True)
191 self.cur = self.conn.cursor()
192 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000193 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000194 print "Can't connect to MYSQL; reconnecting"
195 time.sleep(self.reconnect_wait)
196 self.disconnect()
197
198
199 def disconnect(self):
200 if self.conn:
201 self.conn.close()
202 self.conn = None
203 self.cur = None
204
205
206 def execute(self, *args, **dargs):
207 while (True):
208 try:
209 self.cur.execute(*args, **dargs)
210 return self.cur.fetchall()
211 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000212 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000213 print "MYSQL connection died; reconnecting"
214 time.sleep(self.reconnect_wait)
215 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000216
217
mblighdbdac6c2008-03-05 15:49:58 +0000218def generate_parse_command(results_dir, flags=""):
219 parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
220 output = os.path.abspath(os.path.join(results_dir, '.parse.log'))
221 cmd = "%s %s -r -o %s > %s 2>&1 &"
222 return cmd % (parse, flags, results_dir, output)
223
224
mbligh36768f02008-02-22 18:28:33 +0000225def parse_results(results_dir, flags=""):
mbligh4314a712008-02-29 22:44:30 +0000226 if _testing_mode:
227 return
mblighdbdac6c2008-03-05 15:49:58 +0000228 os.system(generate_parse_command(results_dir, flags))
mbligh36768f02008-02-22 18:28:33 +0000229
230
mblighbb421852008-03-11 22:36:16 +0000231def send_notify_email(subject, message):
232 if not _notify_email:
233 return
234
235 message = "%s / %s / %s\n%s" % (socket.gethostname(), os.getpid(),
236 time.strftime("%X %x"), message)
237 sender = pwd.getpwuid(os.getuid())[0] # see os.getlogin() online docs
238 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
239 sender, _notify_email, subject, message)
240 mailer = smtplib.SMTP('localhost')
241 mailer.sendmail(sender, _notify_email, msg)
242 mailer.quit()
243
244
mbligh36768f02008-02-22 18:28:33 +0000245def log_stacktrace(reason):
246 (type, value, tb) = sys.exc_info()
247 str = "EXCEPTION: %s\n" % reason
mbligh36768f02008-02-22 18:28:33 +0000248 str += ''.join(traceback.format_exception(type, value, tb))
249
250 sys.stderr.write("\n%s\n" % str)
mblighbb421852008-03-11 22:36:16 +0000251 send_notify_email("monitor_db exception", str)
mbligh36768f02008-02-22 18:28:33 +0000252
mblighbb421852008-03-11 22:36:16 +0000253
254def get_proc_poll_fn(pid):
255 proc_path = os.path.join('/proc', str(pid))
256 def poll_fn():
257 if os.path.exists(proc_path):
258 return None
259 return 0 # we can't get a real exit code
260 return poll_fn
261
262
263def kill_autoserv(pid, poll_fn=None):
264 print 'killing', pid
265 if poll_fn is None:
266 poll_fn = get_proc_poll_fn(pid)
267 if poll_fn() == None:
268 os.kill(pid, signal.SIGCONT)
269 os.kill(pid, signal.SIGTERM)
mbligh36768f02008-02-22 18:28:33 +0000270
271
272class Dispatcher:
mbligh90a549d2008-03-25 23:52:34 +0000273 autoserv_procs_cache = None
274
mblighbb421852008-03-11 22:36:16 +0000275 def __init__(self):
mbligh36768f02008-02-22 18:28:33 +0000276 self._agents = []
mbligh36768f02008-02-22 18:28:33 +0000277
mbligh36768f02008-02-22 18:28:33 +0000278
mblighbb421852008-03-11 22:36:16 +0000279 def do_initial_recovery(self, recover_hosts=True):
280 # always recover processes
281 self._recover_processes()
282
283 if recover_hosts:
284 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000285
286
287 def tick(self):
mbligh90a549d2008-03-25 23:52:34 +0000288 Dispatcher.autoserv_procs_cache = None
mblighbb421852008-03-11 22:36:16 +0000289 self._find_aborting()
290 self._find_more_work()
mbligh36768f02008-02-22 18:28:33 +0000291 self._handle_agents()
mbligh62ba2ed2008-04-30 17:09:25 +0000292 self._clear_inactive_blocks()
mbligh36768f02008-02-22 18:28:33 +0000293
294
295 def add_agent(self, agent):
296 self._agents.append(agent)
297 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000298
299 # Find agent corresponding to the specified queue_entry
300 def get_agents(self, queue_entry):
301 res_agents = []
302 for agent in self._agents:
303 if queue_entry.id in agent.queue_entry_ids:
304 res_agents.append(agent)
305 return res_agents
306
307
308 def remove_agent(self, agent):
309 self._agents.remove(agent)
mbligh36768f02008-02-22 18:28:33 +0000310
311
mbligh90a549d2008-03-25 23:52:34 +0000312 @classmethod
313 def find_autoservs(cls, orphans_only=False):
mblighbb421852008-03-11 22:36:16 +0000314 """\
315 Returns a dict mapping pids to command lines for root autoserv
mbligh90a549d2008-03-25 23:52:34 +0000316 processes. If orphans_only=True, return only processes that
317 have been orphaned (i.e. parent pid = 1).
mblighbb421852008-03-11 22:36:16 +0000318 """
mbligh90a549d2008-03-25 23:52:34 +0000319 if cls.autoserv_procs_cache is not None:
320 return cls.autoserv_procs_cache
321
mblighbb421852008-03-11 22:36:16 +0000322 proc = subprocess.Popen(
mbligh90a549d2008-03-25 23:52:34 +0000323 ['/bin/ps', 'x', '-o', 'pid,pgid,ppid,comm,args'],
mblighbb421852008-03-11 22:36:16 +0000324 stdout=subprocess.PIPE)
325 # split each line into the four columns output by ps
mbligh90a549d2008-03-25 23:52:34 +0000326 procs = [line.split(None, 4) for line in
mblighbb421852008-03-11 22:36:16 +0000327 proc.communicate()[0].splitlines()]
mbligh90a549d2008-03-25 23:52:34 +0000328 autoserv_procs = {}
329 for proc in procs:
330 # check ppid == 1 for orphans
331 if orphans_only and proc[2] != 1:
332 continue
333 # only root autoserv processes have pgid == pid
334 if (proc[3] == 'autoserv' and # comm
335 proc[1] == proc[0]): # pgid == pid
336 # map pid to args
337 autoserv_procs[int(proc[0])] = proc[4]
338 cls.autoserv_procs_cache = autoserv_procs
339 return autoserv_procs
mblighbb421852008-03-11 22:36:16 +0000340
341
342 def recover_queue_entry(self, queue_entry, run_monitor):
343 job = queue_entry.job
344 if job.is_synchronous():
345 all_queue_entries = job.get_host_queue_entries()
346 else:
347 all_queue_entries = [queue_entry]
348 all_queue_entry_ids = [queue_entry.id for queue_entry
349 in all_queue_entries]
350 queue_task = RecoveryQueueTask(
351 job=queue_entry.job,
352 queue_entries=all_queue_entries,
353 run_monitor=run_monitor)
354 self.add_agent(Agent(tasks=[queue_task],
355 queue_entry_ids=all_queue_entry_ids))
356
357
358 def _recover_processes(self):
mbligh90a549d2008-03-25 23:52:34 +0000359 orphans = self.find_autoservs(orphans_only=True)
mblighbb421852008-03-11 22:36:16 +0000360
361 # first, recover running queue entries
362 rows = _db.execute("""SELECT * FROM host_queue_entries
363 WHERE status = 'Running'""")
364 queue_entries = [HostQueueEntry(row=i) for i in rows]
365 requeue_entries = []
366 recovered_entry_ids = set()
367 for queue_entry in queue_entries:
368 run_monitor = PidfileRunMonitor(
369 queue_entry.results_dir())
370 pid, exit_code = run_monitor.get_pidfile_info()
371 if pid is None:
372 # autoserv apparently never got run, so requeue
373 requeue_entries.append(queue_entry)
374 continue
375 if queue_entry.id in recovered_entry_ids:
376 # synchronous job we've already recovered
377 continue
378 print 'Recovering queue entry %d (pid %d)' % (
379 queue_entry.id, pid)
380 job = queue_entry.job
381 if job.is_synchronous():
382 for entry in job.get_host_queue_entries():
383 assert entry.active
384 recovered_entry_ids.add(entry.id)
385 self.recover_queue_entry(queue_entry,
386 run_monitor)
387 orphans.pop(pid, None)
388
389 # and requeue other active queue entries
390 rows = _db.execute("""SELECT * FROM host_queue_entries
391 WHERE active AND NOT complete
392 AND status != 'Running'
393 AND status != 'Pending'
394 AND status != 'Abort'
395 AND status != 'Aborting'""")
396 queue_entries = [HostQueueEntry(row=i) for i in rows]
397 for queue_entry in queue_entries + requeue_entries:
398 print 'Requeuing running QE %d' % queue_entry.id
mbligh90a549d2008-03-25 23:52:34 +0000399 queue_entry.clear_results_dir(dont_delete_files=True)
mblighbb421852008-03-11 22:36:16 +0000400 queue_entry.requeue()
401
402
403 # now kill any remaining autoserv processes
404 for pid in orphans.keys():
405 print 'Killing orphan %d (%s)' % (pid, orphans[pid])
406 kill_autoserv(pid)
407
408 # recover aborting tasks
mbligh90a549d2008-03-25 23:52:34 +0000409 rebooting_host_ids = set()
mblighd5c95802008-03-05 00:33:46 +0000410 rows = _db.execute("""SELECT * FROM host_queue_entries
411 WHERE status='Abort' or status='Aborting'""")
mblighbb421852008-03-11 22:36:16 +0000412 queue_entries = [HostQueueEntry(row=i) for i in rows]
413 for queue_entry in queue_entries:
mbligh90a549d2008-03-25 23:52:34 +0000414 print 'Recovering aborting QE %d' % queue_entry.id
mblighbb421852008-03-11 22:36:16 +0000415 queue_host = queue_entry.get_host()
416 reboot_task = RebootTask(queue_host)
417 verify_task = VerifyTask(host = queue_host)
418 self.add_agent(Agent(tasks=[reboot_task,
419 verify_task],
420 queue_entry_ids=[queue_entry.id]))
421 queue_entry.set_status('Aborted')
422 # Secure the host from being picked up
423 queue_host.set_status('Rebooting')
mbligh90a549d2008-03-25 23:52:34 +0000424 rebooting_host_ids.add(queue_host.id)
mblighd5c95802008-03-05 00:33:46 +0000425
mblighbb421852008-03-11 22:36:16 +0000426 # reverify hosts that were in the middle of verify, repair or
427 # reboot
mbligh90a549d2008-03-25 23:52:34 +0000428 self._reverify_hosts_where("""(status = 'Repairing' OR
429 status = 'Verifying' OR
430 status = 'Rebooting')""",
431 exclude_ids=rebooting_host_ids)
432
433 # finally, recover "Running" hosts with no active queue entries,
434 # although this should never happen
435 message = ('Recovering running host %s - this probably '
436 'indicates a scheduler bug')
437 self._reverify_hosts_where("""status = 'Running' AND
438 id NOT IN (SELECT host_id
439 FROM host_queue_entries
440 WHERE active)""",
441 print_message=message)
442
443
444 def _reverify_hosts_where(self, where,
445 print_message='Reverifying host %s',
446 exclude_ids=set()):
mbligh5244cbb2008-04-24 20:39:52 +0000447 rows = _db.execute('SELECT * FROM hosts WHERE locked = 0 AND '
448 'invalid = 0 AND ' + where)
mblighbb421852008-03-11 22:36:16 +0000449 hosts = [Host(row=i) for i in rows]
450 for host in hosts:
mbligh90a549d2008-03-25 23:52:34 +0000451 if host.id in exclude_ids:
452 continue
453 if print_message is not None:
454 print print_message % host.hostname
455 verify_task = VerifyTask(host = host)
456 self.add_agent(Agent(tasks = [verify_task]))
mblighbb421852008-03-11 22:36:16 +0000457
458
459 def _recover_hosts(self):
mbligh90a549d2008-03-25 23:52:34 +0000460 # recover "Repair Failed" hosts
461 message = 'Reverifying dead host %s'
462 self._reverify_hosts_where("status = 'Repair Failed'",
463 print_message=message)
mbligh36768f02008-02-22 18:28:33 +0000464
465
mbligh62ba2ed2008-04-30 17:09:25 +0000466 def _clear_inactive_blocks(self):
467 """
468 Clear out blocks for all completed jobs.
469 """
470 # this would be simpler using NOT IN (subquery), but MySQL
471 # treats all IN subqueries as dependent, so this optimizes much
472 # better
473 _db.execute("""
474 DELETE ihq FROM ineligible_host_queues ihq
475 LEFT JOIN (SELECT job_id FROM host_queue_entries
476 WHERE NOT complete) hqe
477 USING (job_id) WHERE hqe.job_id IS NULL""")
478
479
mbligh36768f02008-02-22 18:28:33 +0000480 def _find_more_work(self):
481 print "finding work"
482
483 num_started = 0
484 for host in idle_hosts():
485 tasks = host.next_queue_entries()
486 if tasks:
487 for next in tasks:
488 try:
489 agent = next.run(assigned_host=host)
490 if agent:
491 self.add_agent(agent)
492
493 num_started += 1
494 if num_started>=100:
495 return
496 break
497 except:
498 next.set_status('Failed')
499
500# if next.host:
501# next.host.set_status('Ready')
502
503 log_stacktrace("task_id = %d" % next.id)
504
505
mblighd5c95802008-03-05 00:33:46 +0000506 def _find_aborting(self):
507 num_aborted = 0
508 # Find jobs that are aborting
509 for entry in queue_entries_to_abort():
510 agents_to_abort = self.get_agents(entry)
511 entry_host = entry.get_host()
512 reboot_task = RebootTask(entry_host)
513 verify_task = VerifyTask(host = entry_host)
514 tasks = [reboot_task, verify_task]
515 if agents_to_abort:
516 abort_task = AbortTask(entry, agents_to_abort)
517 tasks.insert(0, abort_task)
518 else:
519 entry.set_status('Aborted')
520 # just to make sure this host does not get
521 # taken away
522 entry_host.set_status('Rebooting')
523 self.add_agent(Agent(tasks=tasks,
524 queue_entry_ids = [entry.id]))
525 num_aborted += 1
526 if num_aborted >= 50:
527 break
528
529
mbligh36768f02008-02-22 18:28:33 +0000530 def _handle_agents(self):
531 still_running = []
532 for agent in self._agents:
533 agent.tick()
534 if not agent.is_done():
535 still_running.append(agent)
536 else:
537 print "agent finished"
538 self._agents = still_running
539
540
541class RunMonitor(object):
542 def __init__(self, cmd, nice_level = None, log_file = None):
543 self.nice_level = nice_level
544 self.log_file = log_file
545 self.proc = self.run(cmd)
546
547 def run(self, cmd):
548 if self.nice_level:
549 nice_cmd = ['nice','-n', str(self.nice_level)]
550 nice_cmd.extend(cmd)
551 cmd = nice_cmd
552
553 out_file = None
554 if self.log_file:
555 try:
mblighbb421852008-03-11 22:36:16 +0000556 os.makedirs(os.path.dirname(self.log_file))
mblighcadb3532008-04-15 17:46:26 +0000557 except OSError, exc:
558 if exc.errno != errno.EEXIST:
559 log_stacktrace(
560 'Unexpected error creating logfile '
561 'directory for %s' % self.log_file)
562 try:
mbligh36768f02008-02-22 18:28:33 +0000563 out_file = open(self.log_file, 'a')
564 out_file.write("\n%s\n" % ('*'*80))
mblighcadb3532008-04-15 17:46:26 +0000565 out_file.write("%s> %s\n" %
566 (time.strftime("%X %x"), cmd))
mbligh36768f02008-02-22 18:28:33 +0000567 out_file.write("%s\n" % ('*'*80))
mblighcadb3532008-04-15 17:46:26 +0000568 except (OSError, IOError):
569 log_stacktrace('Error opening log file %s' %
570 self.log_file)
571
mbligh36768f02008-02-22 18:28:33 +0000572 if not out_file:
573 out_file = open('/dev/null', 'w')
mblighcadb3532008-04-15 17:46:26 +0000574
mbligh36768f02008-02-22 18:28:33 +0000575 in_devnull = open('/dev/null', 'r')
576 print "cmd = %s" % cmd
577 print "path = %s" % os.getcwd()
578
579 proc = subprocess.Popen(cmd, stdout=out_file,
580 stderr=subprocess.STDOUT, stdin=in_devnull)
581 out_file.close()
582 in_devnull.close()
583 return proc
584
585
mblighbb421852008-03-11 22:36:16 +0000586 def get_pid(self):
587 return self.proc.pid
588
589
mbligh36768f02008-02-22 18:28:33 +0000590 def kill(self):
mblighbb421852008-03-11 22:36:16 +0000591 kill_autoserv(self.get_pid(), self.exit_code)
592
mbligh36768f02008-02-22 18:28:33 +0000593
594 def exit_code(self):
595 return self.proc.poll()
596
597
mblighbb421852008-03-11 22:36:16 +0000598class PidfileException(Exception):
599 """\
600 Raised when there's some unexpected behavior with the pid file.
601 """
602
603
604class PidfileRunMonitor(RunMonitor):
605 def __init__(self, results_dir, cmd=None, nice_level=None,
606 log_file=None):
607 self.results_dir = os.path.abspath(results_dir)
608 self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
609 self.lost_process = False
mbligh90a549d2008-03-25 23:52:34 +0000610 self.start_time = time.time()
mblighbb421852008-03-11 22:36:16 +0000611 if cmd is None:
612 # we're reattaching to an existing pid, so don't call
613 # the superconstructor (we don't want to kick off a new
614 # process)
615 pass
616 else:
mblighd64e5702008-04-04 21:39:28 +0000617 super(PidfileRunMonitor, self).__init__(cmd,
618 nice_level, log_file)
mblighbb421852008-03-11 22:36:16 +0000619
620
621 def get_pid(self):
622 pid, exit_status = self.get_pidfile_info()
623 assert pid is not None
624 return pid
625
626
mbligh90a549d2008-03-25 23:52:34 +0000627 def _check_command_line(self, command_line, spacer=' ',
628 print_error=False):
629 results_dir_arg = spacer.join(('', '-r', self.results_dir, ''))
630 match = results_dir_arg in command_line
631 if print_error and not match:
632 print '%s not found in %s' % (repr(results_dir_arg),
633 repr(command_line))
634 return match
635
636
637 def _check_proc_fs(self, pid):
mblighbb421852008-03-11 22:36:16 +0000638 cmdline_path = os.path.join('/proc', str(pid), 'cmdline')
639 try:
640 cmdline_file = open(cmdline_path, 'r')
641 cmdline = cmdline_file.read().strip()
642 cmdline_file.close()
643 except IOError:
644 return False
645 # /proc/.../cmdline has \x00 separating args
mbligh90a549d2008-03-25 23:52:34 +0000646 return self._check_command_line(cmdline, spacer='\x00',
647 print_error=True)
mblighbb421852008-03-11 22:36:16 +0000648
649
650 def read_pidfile(self):
651 if not os.path.exists(self.pid_file):
652 return None, None
653 file_obj = open(self.pid_file, 'r')
654 lines = file_obj.readlines()
655 file_obj.close()
656 assert 1 <= len(lines) <= 2
657 try:
658 pid = int(lines[0])
659 exit_status = None
660 if len(lines) == 2:
661 exit_status = int(lines[1])
662 except ValueError, exc:
663 raise Exception('Corrupt pid file: ' + str(exc.args))
664
665 return pid, exit_status
666
667
mbligh90a549d2008-03-25 23:52:34 +0000668 def _find_autoserv_proc(self):
669 autoserv_procs = Dispatcher.find_autoservs()
670 for pid, args in autoserv_procs.iteritems():
671 if self._check_command_line(args):
672 return pid, args
673 return None, None
674
675
mblighbb421852008-03-11 22:36:16 +0000676 def get_pidfile_info(self):
677 """\
678 Returns:
679 None, None if autoserv has not yet run
680 pid, None if autoserv is running
681 pid, exit_status if autoserv has completed
682 """
683 if self.lost_process:
684 return self.pid, self.exit_status
685
686 pid, exit_status = self.read_pidfile()
687
mbligh90a549d2008-03-25 23:52:34 +0000688 if pid is None:
689 return self._handle_no_pid()
690
691 if exit_status is None:
692 # double check whether or not autoserv is running
693 proc_running = self._check_proc_fs(pid)
694 if proc_running:
695 return pid, exit_status
696
697 # pid but no process - maybe process *just* exited
mblighbb421852008-03-11 22:36:16 +0000698 pid, exit_status = self.read_pidfile()
mbligh90a549d2008-03-25 23:52:34 +0000699 if exit_status is None:
mblighbb421852008-03-11 22:36:16 +0000700 # autoserv exited without writing an exit code
701 # to the pidfile
702 error = ('autoserv died without writing exit '
703 'code')
704 message = error + '\nPid: %s\nPidfile: %s' % (
705 pid, self.pid_file)
706 print message
707 send_notify_email(error, message)
mbligh90a549d2008-03-25 23:52:34 +0000708 self.on_lost_process(pid)
mblighbb421852008-03-11 22:36:16 +0000709 return self.pid, self.exit_status
710
711 return pid, exit_status
712
713
mbligh90a549d2008-03-25 23:52:34 +0000714 def _handle_no_pid(self):
715 """\
716 Called when no pidfile is found or no pid is in the pidfile.
717 """
718 # is autoserv running?
719 pid, args = self._find_autoserv_proc()
720 if pid is None:
721 # no autoserv process running
722 message = 'No pid found at ' + self.pid_file
723 else:
724 message = ("Process %d (%s) hasn't written pidfile %s" %
725 (pid, args, self.pid_file))
726
727 print message
728 if time.time() - self.start_time > PIDFILE_TIMEOUT:
729 send_notify_email('Process has failed to write pidfile',
730 message)
731 if pid is not None:
732 kill_autoserv(pid)
733 else:
734 pid = 0
735 self.on_lost_process(pid)
736 return self.pid, self.exit_status
737
738 return None, None
739
740
741 def on_lost_process(self, pid):
742 """\
743 Called when autoserv has exited without writing an exit status,
744 or we've timed out waiting for autoserv to write a pid to the
745 pidfile. In either case, we just return failure and the caller
746 should signal some kind of warning.
747
748 pid is unimportant here, as it shouldn't be used by anyone.
749 """
750 self.lost_process = True
751 self.pid = pid
752 self.exit_status = 1
753
754
mblighbb421852008-03-11 22:36:16 +0000755 def exit_code(self):
756 pid, exit_code = self.get_pidfile_info()
mblighbb421852008-03-11 22:36:16 +0000757 return exit_code
758
759
mbligh36768f02008-02-22 18:28:33 +0000760class Agent(object):
mblighd5c95802008-03-05 00:33:46 +0000761 def __init__(self, tasks, queue_entry_ids=[]):
mbligh36768f02008-02-22 18:28:33 +0000762 self.active_task = None
763 self.queue = Queue.Queue(0)
764 self.dispatcher = None
mblighd5c95802008-03-05 00:33:46 +0000765 self.queue_entry_ids = queue_entry_ids
mbligh36768f02008-02-22 18:28:33 +0000766
767 for task in tasks:
768 self.add_task(task)
769
770
771 def add_task(self, task):
772 self.queue.put_nowait(task)
773 task.agent = self
774
775
776 def tick(self):
777 print "agent tick"
778 if self.active_task and not self.active_task.is_done():
779 self.active_task.poll()
780 else:
781 self._next_task();
782
783
784 def _next_task(self):
785 print "agent picking task"
786 if self.active_task:
787 assert self.active_task.is_done()
788
mblighe2586682008-02-29 22:45:46 +0000789 if not self.active_task.success:
790 self.on_task_failure()
791
mbligh36768f02008-02-22 18:28:33 +0000792 self.active_task = None
793 if not self.is_done():
794 self.active_task = self.queue.get_nowait()
795 if self.active_task:
796 self.active_task.start()
797
798
mblighe2586682008-02-29 22:45:46 +0000799 def on_task_failure(self):
mblighe2586682008-02-29 22:45:46 +0000800 self.queue = Queue.Queue(0)
801 for task in self.active_task.failure_tasks:
802 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +0000803
mblighe2586682008-02-29 22:45:46 +0000804
mbligh36768f02008-02-22 18:28:33 +0000805 def is_done(self):
806 return self.active_task == None and self.queue.empty()
807
808
809 def start(self):
810 assert self.dispatcher
811
812 self._next_task()
813
mblighd5c95802008-03-05 00:33:46 +0000814
mbligh36768f02008-02-22 18:28:33 +0000815class AgentTask(object):
mbligh16c722d2008-03-05 00:58:44 +0000816 def __init__(self, cmd, failure_tasks = []):
mbligh36768f02008-02-22 18:28:33 +0000817 self.done = False
818 self.failure_tasks = failure_tasks
819 self.started = False
820 self.cmd = cmd
mblighd5c95802008-03-05 00:33:46 +0000821 self.task = None
mbligh36768f02008-02-22 18:28:33 +0000822 self.agent = None
mblighd5c95802008-03-05 00:33:46 +0000823 self.monitor = None
mblighd64e5702008-04-04 21:39:28 +0000824 self.success = None
mbligh36768f02008-02-22 18:28:33 +0000825
826
827 def poll(self):
828 print "poll"
mblighd5c95802008-03-05 00:33:46 +0000829 if self.monitor:
mbligh36768f02008-02-22 18:28:33 +0000830 self.tick(self.monitor.exit_code())
831 else:
832 self.finished(False)
833
834
835 def tick(self, exit_code):
836 if exit_code==None:
837 return
838# print "exit_code was %d" % exit_code
839 if exit_code == 0:
840 success = True
841 else:
842 success = False
843
844 self.finished(success)
845
846
847 def is_done(self):
848 return self.done
849
850
851 def finished(self, success):
852 self.done = True
853 self.success = success
854 self.epilog()
855
856
857 def prolog(self):
858 pass
859
mblighd64e5702008-04-04 21:39:28 +0000860
861 def create_temp_resultsdir(self, suffix=''):
862 self.temp_results_dir = tempfile.mkdtemp(suffix=suffix)
863
864
865 def cleanup(self):
866 if (hasattr(self, 'temp_results_dir') and
867 os.path.exists(self.temp_results_dir)):
868 shutil.rmtree(self.temp_results_dir)
869
mbligh36768f02008-02-22 18:28:33 +0000870
871 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +0000872 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +0000873
874
875 def start(self):
876 assert self.agent
877
878 if not self.started:
879 self.prolog()
880 self.run()
881
882 self.started = True
883
mblighd64e5702008-04-04 21:39:28 +0000884
mbligh36768f02008-02-22 18:28:33 +0000885 def abort(self):
mblighd5c95802008-03-05 00:33:46 +0000886 if self.monitor:
887 self.monitor.kill()
888 self.done = True
mblighd64e5702008-04-04 21:39:28 +0000889 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +0000890
891
892 def run(self):
893 if self.cmd:
894 print "agent starting monitor"
mbligh36768f02008-02-22 18:28:33 +0000895 log_file = None
896 if hasattr(self, 'host'):
mblighbb421852008-03-11 22:36:16 +0000897 log_file = os.path.join(RESULTS_DIR, 'hosts',
898 self.host.hostname)
899 self.monitor = RunMonitor(
900 self.cmd, nice_level = AUTOSERV_NICE_LEVEL,
901 log_file = log_file)
mbligh36768f02008-02-22 18:28:33 +0000902
903
904class RepairTask(AgentTask):
mbligh16c722d2008-03-05 00:58:44 +0000905 def __init__(self, host, fail_queue_entry=None):
906 """\
907 fail_queue_entry: queue entry to mark failed if this repair
908 fails.
909 """
mblighd64e5702008-04-04 21:39:28 +0000910 self.create_temp_resultsdir('.repair')
911 cmd = [_autoserv_path , '-R', '-m', host.hostname,
912 '-r', self.temp_results_dir]
mbligh36768f02008-02-22 18:28:33 +0000913 self.host = host
mbligh16c722d2008-03-05 00:58:44 +0000914 self.fail_queue_entry = fail_queue_entry
mblighd64e5702008-04-04 21:39:28 +0000915 super(RepairTask, self).__init__(cmd)
mblighe2586682008-02-29 22:45:46 +0000916
mbligh36768f02008-02-22 18:28:33 +0000917
918 def prolog(self):
919 print "repair_task starting"
920 self.host.set_status('Repairing')
921
922
923 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +0000924 super(RepairTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +0000925 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000926 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +0000927 else:
mbligh16c722d2008-03-05 00:58:44 +0000928 self.host.set_status('Repair Failed')
929 if self.fail_queue_entry:
930 self.fail_queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +0000931
932
933class VerifyTask(AgentTask):
934 def __init__(self, queue_entry=None, host=None):
935 assert bool(queue_entry) != bool(host)
936
937 self.host = host or queue_entry.host
938 self.queue_entry = queue_entry
939
mblighd64e5702008-04-04 21:39:28 +0000940 self.create_temp_resultsdir('.verify')
mbligh48c10a52008-02-29 22:46:38 +0000941 cmd = [_autoserv_path,'-v','-m',self.host.hostname,
mbligh36768f02008-02-22 18:28:33 +0000942 '-r', self.temp_results_dir]
943
mbligh16c722d2008-03-05 00:58:44 +0000944 fail_queue_entry = None
945 if queue_entry and not queue_entry.meta_host:
946 fail_queue_entry = queue_entry
947 failure_tasks = [RepairTask(self.host, fail_queue_entry)]
mblighe2586682008-02-29 22:45:46 +0000948
mblighd64e5702008-04-04 21:39:28 +0000949 super(VerifyTask, self).__init__(cmd,
950 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +0000951
952
mbligh36768f02008-02-22 18:28:33 +0000953 def prolog(self):
954 print "starting verify on %s" % (self.host.hostname)
955 if self.queue_entry:
956 self.queue_entry.set_status('Verifying')
mblighdffd6372008-02-29 22:47:33 +0000957 self.queue_entry.clear_results_dir(
958 self.queue_entry.verify_results_dir())
mbligh36768f02008-02-22 18:28:33 +0000959 self.host.set_status('Verifying')
960
961
mblighd64e5702008-04-04 21:39:28 +0000962 def cleanup(self):
963 if not os.path.exists(self.temp_results_dir):
964 return
mbligh36768f02008-02-22 18:28:33 +0000965 if self.queue_entry and (self.success or
mblighd64e5702008-04-04 21:39:28 +0000966 not self.queue_entry.meta_host):
mbligh36768f02008-02-22 18:28:33 +0000967 self.move_results()
mblighd64e5702008-04-04 21:39:28 +0000968 super(VerifyTask, self).cleanup()
969
970
971 def epilog(self):
972 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +0000973
974 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000975 self.host.set_status('Ready')
976 elif self.queue_entry:
mblighdffd6372008-02-29 22:47:33 +0000977 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +0000978
979
980 def move_results(self):
981 assert self.queue_entry is not None
mblighe2586682008-02-29 22:45:46 +0000982 target_dir = self.queue_entry.verify_results_dir()
mbligh36768f02008-02-22 18:28:33 +0000983 if not os.path.exists(target_dir):
984 os.makedirs(target_dir)
985 files = os.listdir(self.temp_results_dir)
986 for filename in files:
mblighbb421852008-03-11 22:36:16 +0000987 if filename == AUTOSERV_PID_FILE:
988 continue
mblighe2586682008-02-29 22:45:46 +0000989 self.force_move(os.path.join(self.temp_results_dir,
990 filename),
991 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +0000992
993
mblighe2586682008-02-29 22:45:46 +0000994 @staticmethod
995 def force_move(source, dest):
996 """\
997 Replacement for shutil.move() that will delete the destination
998 if it exists, even if it's a directory.
999 """
1000 if os.path.exists(dest):
1001 print ('Warning: removing existing destination file ' +
1002 dest)
1003 remove_file_or_dir(dest)
1004 shutil.move(source, dest)
1005
1006
mblighdffd6372008-02-29 22:47:33 +00001007class VerifySynchronousTask(VerifyTask):
1008 def __init__(self, queue_entry):
mblighd64e5702008-04-04 21:39:28 +00001009 super(VerifySynchronousTask, self).__init__(
1010 queue_entry = queue_entry)
mblighdffd6372008-02-29 22:47:33 +00001011
1012
mbligh16c722d2008-03-05 00:58:44 +00001013 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001014 super(VerifySynchronousTask, self).epilog()
mbligh16c722d2008-03-05 00:58:44 +00001015 if self.success:
1016 if self.queue_entry.job.num_complete() > 0:
1017 # some other entry failed verify, and we've
1018 # already been marked as stopped
1019 return
mblighdffd6372008-02-29 22:47:33 +00001020
mbligh16c722d2008-03-05 00:58:44 +00001021 self.queue_entry.set_status('Pending')
1022 job = self.queue_entry.job
1023 if job.is_ready():
1024 agent = job.run(self.queue_entry)
1025 self.agent.dispatcher.add_agent(agent)
mblighe2586682008-02-29 22:45:46 +00001026
mbligh36768f02008-02-22 18:28:33 +00001027class QueueTask(AgentTask):
1028 def __init__(self, job, queue_entries, cmd):
mblighd64e5702008-04-04 21:39:28 +00001029 super(QueueTask, self).__init__(cmd)
mbligh36768f02008-02-22 18:28:33 +00001030 self.job = job
1031 self.queue_entries = queue_entries
1032
1033
mblighbb421852008-03-11 22:36:16 +00001034 @staticmethod
1035 def _write_keyval(results_dir, field, value):
1036 key_path = os.path.join(results_dir, 'keyval')
mbligh36768f02008-02-22 18:28:33 +00001037 keyval_file = open(key_path, 'a')
1038 print >> keyval_file, '%s=%d' % (field, value)
1039 keyval_file.close()
1040
1041
mblighbb421852008-03-11 22:36:16 +00001042 def results_dir(self):
1043 return self.queue_entries[0].results_dir()
1044
1045
1046 def run(self):
1047 """\
1048 Override AgentTask.run() so we can use a PidfileRunMonitor.
1049 """
1050 self.monitor = PidfileRunMonitor(self.results_dir(),
1051 cmd=self.cmd,
1052 nice_level=AUTOSERV_NICE_LEVEL)
1053
1054
mbligh36768f02008-02-22 18:28:33 +00001055 def prolog(self):
mblighe2586682008-02-29 22:45:46 +00001056 # write some job timestamps into the job keyval file
1057 queued = time.mktime(self.job.created_on.timetuple())
1058 started = time.time()
mblighbb421852008-03-11 22:36:16 +00001059 self._write_keyval(self.results_dir(), "job_queued", queued)
1060 self._write_keyval(self.results_dir(), "job_started", started)
mbligh36768f02008-02-22 18:28:33 +00001061 for queue_entry in self.queue_entries:
1062 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
1063 queue_entry.set_status('Running')
1064 queue_entry.host.set_status('Running')
mblighe2586682008-02-29 22:45:46 +00001065 if (not self.job.is_synchronous() and
1066 self.job.num_machines() > 1):
1067 assert len(self.queue_entries) == 1
1068 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001069
1070
1071 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001072 super(QueueTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001073 if self.success:
1074 status = 'Completed'
1075 else:
1076 status = 'Failed'
1077
mblighe2586682008-02-29 22:45:46 +00001078 # write another timestamp into the job keyval file
1079 finished = time.time()
mblighbb421852008-03-11 22:36:16 +00001080 self._write_keyval(self.results_dir(), "job_finished", finished)
mbligh36768f02008-02-22 18:28:33 +00001081 for queue_entry in self.queue_entries:
1082 queue_entry.set_status(status)
1083 queue_entry.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001084
1085 if self.job.is_synchronous() or self.job.num_machines()==1:
1086 if self.job.is_finished():
1087 parse_results(self.job.results_dir())
1088 else:
1089 for queue_entry in self.queue_entries:
mblighbb421852008-03-11 22:36:16 +00001090 parse_results(queue_entry.results_dir(),
1091 flags='-l 2')
1092
mbligh36768f02008-02-22 18:28:33 +00001093 print "queue_task finished with %s/%s" % (status, self.success)
1094
1095
mblighbb421852008-03-11 22:36:16 +00001096class RecoveryQueueTask(QueueTask):
1097 def __init__(self, job, queue_entries, run_monitor):
mblighd64e5702008-04-04 21:39:28 +00001098 super(RecoveryQueueTask, self).__init__(job,
1099 queue_entries, cmd=None)
mblighbb421852008-03-11 22:36:16 +00001100 self.run_monitor = run_monitor
1101
1102
1103 def run(self):
1104 self.monitor = self.run_monitor
1105
1106
1107 def prolog(self):
1108 # recovering an existing process - don't do prolog
1109 pass
1110
1111
mbligh36768f02008-02-22 18:28:33 +00001112class RebootTask(AgentTask):
mblighd5c95802008-03-05 00:33:46 +00001113 def __init__(self, host):
1114 global _autoserv_path
1115
1116 # Current implementation of autoserv requires control file
1117 # to be passed on reboot action request. TODO: remove when no
1118 # longer appropriate.
mblighd64e5702008-04-04 21:39:28 +00001119 self.create_temp_resultsdir('.reboot')
mblighd5c95802008-03-05 00:33:46 +00001120 self.cmd = [_autoserv_path, '-b', '-m', host.hostname,
mblighd64e5702008-04-04 21:39:28 +00001121 '-r', self.temp_results_dir, '/dev/null']
mbligh36768f02008-02-22 18:28:33 +00001122 self.host = host
mblighd64e5702008-04-04 21:39:28 +00001123 super(RebootTask, self).__init__(self.cmd,
mbligh16c722d2008-03-05 00:58:44 +00001124 failure_tasks=[RepairTask(host)])
1125
mblighd5c95802008-03-05 00:33:46 +00001126
1127 def prolog(self):
1128 print "starting reboot task for host: %s" % self.host.hostname
1129 self.host.set_status("Rebooting")
1130
mblighd5c95802008-03-05 00:33:46 +00001131
1132class AbortTask(AgentTask):
1133 def __init__(self, queue_entry, agents_to_abort):
1134 self.queue_entry = queue_entry
1135 self.agents_to_abort = agents_to_abort
1136 for agent in agents_to_abort:
1137 agent.dispatcher.remove_agent(agent)
mblighd64e5702008-04-04 21:39:28 +00001138 super(AbortTask, self).__init__('')
mbligh36768f02008-02-22 18:28:33 +00001139
1140
mblighd5c95802008-03-05 00:33:46 +00001141 def prolog(self):
1142 print "starting abort on host %s, job %s" % (
1143 self.queue_entry.host_id, self.queue_entry.job_id)
1144 self.queue_entry.set_status('Aborting')
1145
mbligh36768f02008-02-22 18:28:33 +00001146
mblighd5c95802008-03-05 00:33:46 +00001147 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001148 super(AbortTask, self).epilog()
mblighd5c95802008-03-05 00:33:46 +00001149 self.queue_entry.set_status('Aborted')
1150 self.success = True
mbligh36768f02008-02-22 18:28:33 +00001151
mblighd64e5702008-04-04 21:39:28 +00001152
mbligh36768f02008-02-22 18:28:33 +00001153 def run(self):
mblighd5c95802008-03-05 00:33:46 +00001154 for agent in self.agents_to_abort:
1155 if (agent.active_task):
1156 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001157
1158
1159class DBObject(object):
mblighe2586682008-02-29 22:45:46 +00001160 def __init__(self, fields, id=None, row=None, new_record=False):
1161 assert (bool(id) != bool(row)) and fields
mbligh36768f02008-02-22 18:28:33 +00001162
mblighe2586682008-02-29 22:45:46 +00001163 self.__table = self._get_table()
mbligh36768f02008-02-22 18:28:33 +00001164 self.__fields = fields
1165
1166 self.__new_record = new_record
1167
1168 if row is None:
1169 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
mbligh6f8bab42008-02-29 22:45:14 +00001170 rows = _db.execute(sql, (id,))
1171 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001172 raise "row not found (table=%s, id=%s)" % \
1173 (self.__table, id)
mbligh6f8bab42008-02-29 22:45:14 +00001174 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001175
mblighe2586682008-02-29 22:45:46 +00001176 assert len(row)==len(fields), (
1177 "table = %s, row = %s/%d, fields = %s/%d" % (
1178 self.__table, row, len(row), fields, len(fields)))
mbligh36768f02008-02-22 18:28:33 +00001179
1180 self.__valid_fields = {}
1181 for i,value in enumerate(row):
1182 self.__dict__[fields[i]] = value
1183 self.__valid_fields[fields[i]] = True
1184
1185 del self.__valid_fields['id']
1186
mblighe2586682008-02-29 22:45:46 +00001187
1188 @classmethod
1189 def _get_table(cls):
1190 raise NotImplementedError('Subclasses must override this')
1191
1192
mbligh36768f02008-02-22 18:28:33 +00001193 def count(self, where, table = None):
1194 if not table:
1195 table = self.__table
mbligh4314a712008-02-29 22:44:30 +00001196
mbligh6f8bab42008-02-29 22:45:14 +00001197 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001198 SELECT count(*) FROM %s
1199 WHERE %s
1200 """ % (table, where))
mbligh36768f02008-02-22 18:28:33 +00001201
mbligh6f8bab42008-02-29 22:45:14 +00001202 assert len(rows) == 1
1203
1204 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001205
1206
1207 def num_cols(self):
1208 return len(self.__fields)
1209
1210
1211 def update_field(self, field, value):
1212 assert self.__valid_fields[field]
1213
1214 if self.__dict__[field] == value:
1215 return
1216
1217 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
1218 (self.__table, field)
mbligh6f8bab42008-02-29 22:45:14 +00001219 _db.execute(query, (value, self.id))
mbligh36768f02008-02-22 18:28:33 +00001220
1221 self.__dict__[field] = value
1222
1223
1224 def save(self):
1225 if self.__new_record:
1226 keys = self.__fields[1:] # avoid id
1227 columns = ','.join([str(key) for key in keys])
1228 values = ['"%s"' % self.__dict__[key] for key in keys]
1229 values = ','.join(values)
1230 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1231 (self.__table, columns, values)
mbligh6f8bab42008-02-29 22:45:14 +00001232 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001233
1234
mblighe2586682008-02-29 22:45:46 +00001235 def delete(self):
1236 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1237 _db.execute(query, (self.id,))
1238
1239
1240 @classmethod
mbligh62ba2ed2008-04-30 17:09:25 +00001241 def fetch(cls, where, params=()):
mblighe2586682008-02-29 22:45:46 +00001242 rows = _db.execute(
mbligh62ba2ed2008-04-30 17:09:25 +00001243 'SELECT * FROM %s WHERE %s' % (cls._get_table(), where),
1244 params)
mblighe2586682008-02-29 22:45:46 +00001245 for row in rows:
1246 yield cls(row=row)
1247
mbligh36768f02008-02-22 18:28:33 +00001248
1249class IneligibleHostQueue(DBObject):
1250 def __init__(self, id=None, row=None, new_record=None):
1251 fields = ['id', 'job_id', 'host_id']
mblighd64e5702008-04-04 21:39:28 +00001252 super(IneligibleHostQueue, self).__init__(fields, id=id,
1253 row=row, new_record=new_record)
mblighe2586682008-02-29 22:45:46 +00001254
1255
1256 @classmethod
1257 def _get_table(cls):
1258 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001259
1260
1261class Host(DBObject):
1262 def __init__(self, id=None, row=None):
mbligh5244cbb2008-04-24 20:39:52 +00001263 fields = ['id', 'hostname', 'locked', 'synch_id','status',
1264 'invalid']
mblighd64e5702008-04-04 21:39:28 +00001265 super(Host, self).__init__(fields, id=id, row=row)
mblighe2586682008-02-29 22:45:46 +00001266
1267
1268 @classmethod
1269 def _get_table(cls):
1270 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001271
1272
1273 def current_task(self):
mbligh6f8bab42008-02-29 22:45:14 +00001274 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001275 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1276 """, (self.id,))
1277
mbligh6f8bab42008-02-29 22:45:14 +00001278 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001279 return None
1280 else:
mbligh6f8bab42008-02-29 22:45:14 +00001281 assert len(rows) == 1
1282 results = rows[0];
mbligh36768f02008-02-22 18:28:33 +00001283# print "current = %s" % results
1284 return HostQueueEntry(row=results)
1285
1286
1287 def next_queue_entries(self):
1288 if self.locked:
1289 print "%s locked, not queuing" % self.hostname
1290 return None
1291# print "%s/%s looking for work" % (self.hostname, self.platform_id)
mbligh6f8bab42008-02-29 22:45:14 +00001292 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001293 SELECT * FROM host_queue_entries
1294 WHERE ((host_id=%s) OR (meta_host IS NOT null AND
1295 (meta_host IN (
1296 SELECT label_id FROM hosts_labels WHERE host_id=%s
1297 )
1298 )
1299 AND job_id NOT IN (
1300 SELECT job_id FROM ineligible_host_queues
1301 WHERE host_id=%s
1302 )))
1303 AND NOT complete AND NOT active
1304 ORDER BY priority DESC, meta_host, id
1305 LIMIT 1
1306 """, (self.id,self.id, self.id))
1307
mbligh6f8bab42008-02-29 22:45:14 +00001308 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001309 return None
1310 else:
mbligh6f8bab42008-02-29 22:45:14 +00001311 return [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001312
1313 def yield_work(self):
1314 print "%s yielding work" % self.hostname
1315 if self.current_task():
1316 self.current_task().requeue()
1317
1318 def set_status(self,status):
mblighbb421852008-03-11 22:36:16 +00001319 print '%s -> %s' % (self.hostname, status)
mbligh36768f02008-02-22 18:28:33 +00001320 self.update_field('status',status)
1321
1322
1323class HostQueueEntry(DBObject):
1324 def __init__(self, id=None, row=None):
1325 assert id or row
1326 fields = ['id', 'job_id', 'host_id', 'priority', 'status',
1327 'meta_host', 'active', 'complete']
mblighd64e5702008-04-04 21:39:28 +00001328 super(HostQueueEntry, self).__init__(fields, id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001329 self.job = Job(self.job_id)
1330
1331 if self.host_id:
1332 self.host = Host(self.host_id)
1333 else:
1334 self.host = None
1335
1336 self.queue_log_path = os.path.join(self.job.results_dir(),
1337 'queue.log.' + str(self.id))
1338
1339
mblighe2586682008-02-29 22:45:46 +00001340 @classmethod
1341 def _get_table(cls):
1342 return 'host_queue_entries'
1343
1344
mbligh36768f02008-02-22 18:28:33 +00001345 def set_host(self, host):
1346 if host:
1347 self.queue_log_record('Assigning host ' + host.hostname)
1348 self.update_field('host_id', host.id)
1349 self.update_field('active', True)
mblighe2586682008-02-29 22:45:46 +00001350 self.block_host(host.id)
mbligh36768f02008-02-22 18:28:33 +00001351 else:
1352 self.queue_log_record('Releasing host')
mblighe2586682008-02-29 22:45:46 +00001353 self.unblock_host(self.host.id)
mbligh36768f02008-02-22 18:28:33 +00001354 self.update_field('host_id', None)
1355
1356 self.host = host
1357
1358
1359 def get_host(self):
mblighe2586682008-02-29 22:45:46 +00001360 return self.host
mbligh36768f02008-02-22 18:28:33 +00001361
1362
1363 def queue_log_record(self, log_line):
mblighe2586682008-02-29 22:45:46 +00001364 now = str(datetime.datetime.now())
mbligh36768f02008-02-22 18:28:33 +00001365 queue_log = open(self.queue_log_path, 'a', 0)
mblighe2586682008-02-29 22:45:46 +00001366 queue_log.write(now + ' ' + log_line + '\n')
mbligh36768f02008-02-22 18:28:33 +00001367 queue_log.close()
1368
1369
mblighe2586682008-02-29 22:45:46 +00001370 def block_host(self, host_id):
1371 print "creating block %s/%s" % (self.job.id, host_id)
1372 row = [0, self.job.id, host_id]
1373 block = IneligibleHostQueue(row=row, new_record=True)
1374 block.save()
1375
1376
1377 def unblock_host(self, host_id):
1378 print "removing block %s/%s" % (self.job.id, host_id)
showarda0939722008-05-06 21:18:13 +00001379 blocks = IneligibleHostQueue.fetch(
1380 'job_id=%d and host_id=%d' % (self.job.id, host_id))
1381 for block in blocks:
1382 block.delete()
mblighe2586682008-02-29 22:45:46 +00001383
1384
mbligh36768f02008-02-22 18:28:33 +00001385 def results_dir(self):
mblighe2586682008-02-29 22:45:46 +00001386 if self.job.is_synchronous() or self.job.num_machines() == 1:
1387 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001388 else:
1389 assert self.host
mblighe2586682008-02-29 22:45:46 +00001390 return os.path.join(self.job.job_dir,
1391 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +00001392
mblighe2586682008-02-29 22:45:46 +00001393
1394 def verify_results_dir(self):
1395 if self.job.is_synchronous() or self.job.num_machines() > 1:
1396 assert self.host
1397 return os.path.join(self.job.job_dir,
1398 self.host.hostname)
1399 else:
1400 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001401
1402
1403 def set_status(self, status):
1404 self.update_field('status', status)
1405 if self.host:
1406 hostname = self.host.hostname
1407 else:
1408 hostname = 'no host'
1409 print "%s/%d status -> %s" % (hostname, self.id, self.status)
1410 if status in ['Queued']:
1411 self.update_field('complete', False)
1412 self.update_field('active', False)
1413
mblighd5c95802008-03-05 00:33:46 +00001414 if status in ['Pending', 'Running', 'Verifying', 'Starting',
1415 'Abort', 'Aborting']:
mbligh36768f02008-02-22 18:28:33 +00001416 self.update_field('complete', False)
1417 self.update_field('active', True)
1418
mblighd5c95802008-03-05 00:33:46 +00001419 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
mbligh36768f02008-02-22 18:28:33 +00001420 self.update_field('complete', True)
1421 self.update_field('active', False)
1422
1423
1424 def run(self,assigned_host=None):
1425 if self.meta_host:
1426 assert assigned_host
mblighe2586682008-02-29 22:45:46 +00001427 # ensure results dir exists for the queue log
mbligh36768f02008-02-22 18:28:33 +00001428 self.job.create_results_dir()
1429 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001430
mbligh36768f02008-02-22 18:28:33 +00001431 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1432 self.meta_host, self.host.hostname, self.status)
1433
1434 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001435
mbligh36768f02008-02-22 18:28:33 +00001436 def requeue(self):
1437 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001438
mbligh36768f02008-02-22 18:28:33 +00001439 if self.meta_host:
1440 self.set_host(None)
1441
1442
mblighe2586682008-02-29 22:45:46 +00001443 def handle_host_failure(self):
1444 """\
1445 Called when this queue entry's host has failed verification and
1446 repair.
1447 """
mblighdffd6372008-02-29 22:47:33 +00001448 assert not self.meta_host
1449 self.set_status('Failed')
1450 if self.job.is_synchronous():
1451 self.job.stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00001452
1453
mbligh90a549d2008-03-25 23:52:34 +00001454 def clear_results_dir(self, results_dir=None, dont_delete_files=False):
mblighe2586682008-02-29 22:45:46 +00001455 results_dir = results_dir or self.results_dir()
1456 if not os.path.exists(results_dir):
1457 return
mbligh90a549d2008-03-25 23:52:34 +00001458 if dont_delete_files:
1459 temp_dir = tempfile.mkdtemp(suffix='.clear_results')
1460 print 'Moving results from %s to %s' % (results_dir,
1461 temp_dir)
mblighe2586682008-02-29 22:45:46 +00001462 for filename in os.listdir(results_dir):
mblighe2586682008-02-29 22:45:46 +00001463 path = os.path.join(results_dir, filename)
mbligh90a549d2008-03-25 23:52:34 +00001464 if dont_delete_files:
1465 shutil.move(path,
1466 os.path.join(temp_dir, filename))
1467 else:
1468 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00001469
1470
1471class Job(DBObject):
1472 def __init__(self, id=None, row=None):
1473 assert id or row
mblighd64e5702008-04-04 21:39:28 +00001474 super(Job, self).__init__(
mblighe2586682008-02-29 22:45:46 +00001475 ['id','owner','name','priority',
1476 'control_file','control_type','created_on',
1477 'synch_type', 'synch_count','synchronizing'],
1478 id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001479
mblighe2586682008-02-29 22:45:46 +00001480 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
1481 self.owner))
1482
1483
1484 @classmethod
1485 def _get_table(cls):
1486 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001487
1488
1489 def is_server_job(self):
1490 return self.control_type != 2
1491
1492
1493 def get_host_queue_entries(self):
mbligh6f8bab42008-02-29 22:45:14 +00001494 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001495 SELECT * FROM host_queue_entries
1496 WHERE job_id= %s
1497 """, (self.id,))
mbligh6f8bab42008-02-29 22:45:14 +00001498 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001499
1500 assert len(entries)>0
1501
1502 return entries
1503
1504
1505 def set_status(self, status, update_queues=False):
1506 self.update_field('status',status)
1507
1508 if update_queues:
1509 for queue_entry in self.get_host_queue_entries():
1510 queue_entry.set_status(status)
1511
1512
1513 def is_synchronous(self):
1514 return self.synch_type == 2
1515
1516
1517 def is_ready(self):
1518 if not self.is_synchronous():
1519 return True
1520 sql = "job_id=%s AND status='Pending'" % self.id
1521 count = self.count(sql, table='host_queue_entries')
1522 return (count == self.synch_count)
1523
1524
1525 def ready_to_synchronize(self):
1526 # heuristic
1527 queue_entries = self.get_host_queue_entries()
1528 count = 0
1529 for queue_entry in queue_entries:
1530 if queue_entry.status == 'Pending':
1531 count += 1
1532
1533 return (count/self.synch_count >= 0.5)
1534
1535
1536 def start_synchronizing(self):
1537 self.update_field('synchronizing', True)
1538
1539
1540 def results_dir(self):
1541 return self.job_dir
1542
1543 def num_machines(self, clause = None):
1544 sql = "job_id=%s" % self.id
1545 if clause:
1546 sql += " AND (%s)" % clause
1547 return self.count(sql, table='host_queue_entries')
1548
1549
1550 def num_queued(self):
1551 return self.num_machines('not complete')
1552
1553
1554 def num_active(self):
1555 return self.num_machines('active')
1556
1557
1558 def num_complete(self):
1559 return self.num_machines('complete')
1560
1561
1562 def is_finished(self):
1563 left = self.num_queued()
1564 print "%s: %s machines left" % (self.name, left)
1565 return left==0
1566
1567 def stop_synchronizing(self):
1568 self.update_field('synchronizing', False)
1569 self.set_status('Queued', update_queues = False)
1570
1571
mblighe2586682008-02-29 22:45:46 +00001572 def stop_all_entries(self):
1573 for child_entry in self.get_host_queue_entries():
1574 if not child_entry.complete:
1575 child_entry.set_status('Stopped')
1576
1577
1578 def write_to_machines_file(self, queue_entry):
1579 hostname = queue_entry.get_host().hostname
1580 print "writing %s to job %s machines file" % (hostname, self.id)
1581 file_path = os.path.join(self.job_dir, '.machines')
1582 mf = open(file_path, 'a')
1583 mf.write("%s\n" % queue_entry.get_host().hostname)
1584 mf.close()
mbligh36768f02008-02-22 18:28:33 +00001585
1586
1587 def create_results_dir(self, queue_entry=None):
1588 print "create: active: %s complete %s" % (self.num_active(),
1589 self.num_complete())
1590
1591 if not os.path.exists(self.job_dir):
1592 os.makedirs(self.job_dir)
1593
1594 if queue_entry:
1595 return queue_entry.results_dir()
1596 return self.job_dir
1597
1598
1599 def run(self, queue_entry):
1600 results_dir = self.create_results_dir(queue_entry)
1601
1602 if self.is_synchronous():
1603 if not self.is_ready():
mblighd5c95802008-03-05 00:33:46 +00001604 return Agent([VerifySynchronousTask(
1605 queue_entry = queue_entry)],
1606 [queue_entry.id])
mbligh36768f02008-02-22 18:28:33 +00001607
1608 queue_entry.set_status('Starting')
1609
1610 ctrl = open(os.tmpnam(), 'w')
1611 if self.control_file:
1612 ctrl.write(self.control_file)
1613 else:
1614 ctrl.write("")
1615 ctrl.flush()
1616
1617 if self.is_synchronous():
mbligh36768f02008-02-22 18:28:33 +00001618 queue_entries = self.get_host_queue_entries()
1619 else:
1620 assert queue_entry
mbligh36768f02008-02-22 18:28:33 +00001621 queue_entries = [queue_entry]
mblighe2586682008-02-29 22:45:46 +00001622 hostnames = ','.join([entry.get_host().hostname
1623 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00001624
mbligh6437ff52008-04-17 15:24:38 +00001625 # determine the job tag
1626 if self.is_synchronous() or self.num_machines() == 1:
1627 job_name = "%s-%s" % (self.id, self.owner)
1628 else:
1629 job_name = "%s-%s/%s" % (self.id, self.owner,
1630 hostnames)
1631
1632 params = [_autoserv_path, '-P', job_name, '-p', '-n',
mblighbb421852008-03-11 22:36:16 +00001633 '-r', os.path.abspath(results_dir),
1634 '-b', '-u', self.owner, '-l', self.name,
1635 '-m', hostnames, ctrl.name]
mbligh36768f02008-02-22 18:28:33 +00001636
1637 if not self.is_server_job():
1638 params.append('-c')
1639
1640 tasks = []
1641 if not self.is_synchronous():
1642 tasks.append(VerifyTask(queue_entry))
mblighe2586682008-02-29 22:45:46 +00001643
1644 tasks.append(QueueTask(job = self,
1645 queue_entries = queue_entries,
1646 cmd = params))
mbligh36768f02008-02-22 18:28:33 +00001647
mblighd5c95802008-03-05 00:33:46 +00001648 ids = []
1649 for entry in queue_entries:
1650 ids.append(entry.id)
1651
1652 agent = Agent(tasks, ids)
mbligh36768f02008-02-22 18:28:33 +00001653
1654 return agent
1655
1656
1657if __name__ == '__main__':
1658 main()