blob: dd2c55c6d83b00cc37de974b0ba3b5d7e987012b [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
mblighcadb3532008-04-15 17:46:26 +00009import optparse, signal, smtplib, socket, datetime, stat, pwd, errno
mblighb090f142008-02-27 21:33:46 +000010from common import global_config
11
mbligh36768f02008-02-22 18:28:33 +000012RESULTS_DIR = '.'
13AUTOSERV_NICE_LEVEL = 10
14
15AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
16
17if os.environ.has_key('AUTOTEST_DIR'):
18 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
19AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
20AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
21
22if AUTOTEST_SERVER_DIR not in sys.path:
23 sys.path.insert(0, AUTOTEST_SERVER_DIR)
24
mblighbb421852008-03-11 22:36:16 +000025AUTOSERV_PID_FILE = '.autoserv_execute'
mbligh90a549d2008-03-25 23:52:34 +000026# how long to wait for autoserv to write a pidfile
27PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000028
mbligh6f8bab42008-02-29 22:45:14 +000029_db = None
mbligh36768f02008-02-22 18:28:33 +000030_shutdown = False
31_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000032_autoserv_path = 'autoserv'
33_testing_mode = False
showardec113162008-05-08 00:52:49 +000034_global_config_section = 'SCHEDULER'
mbligh36768f02008-02-22 18:28:33 +000035
36
37def main():
38 usage = 'usage: %prog [options] results_dir'
39
40 parser = optparse.OptionParser(usage)
mblighbb421852008-03-11 22:36:16 +000041 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
mbligh36768f02008-02-22 18:28:33 +000042 action='store_true')
43 parser.add_option('--logfile', help='Set a log file that all stdout ' +
44 'should be redirected to. Stderr will go to this ' +
45 'file + ".err"')
mbligh4314a712008-02-29 22:44:30 +000046 parser.add_option('--test', help='Indicate that scheduler is under ' +
47 'test and should use dummy autoserv and no parsing',
48 action='store_true')
mbligh36768f02008-02-22 18:28:33 +000049 (options, args) = parser.parse_args()
50 if len(args) != 1:
51 parser.print_usage()
52 return
53
54 global RESULTS_DIR
55 RESULTS_DIR = args[0]
56
mblighe44a46d2008-04-30 17:47:34 +000057 # read in notify_email from global_config
58 c = global_config.global_config
mbligh36768f02008-02-22 18:28:33 +000059 global _notify_email
showardec113162008-05-08 00:52:49 +000060 val = c.get_config_value(_global_config_section, "notify_email")
mblighe44a46d2008-04-30 17:47:34 +000061 if val != "":
62 _notify_email = val
mbligh4314a712008-02-29 22:44:30 +000063
64 if options.test:
65 global _autoserv_path
66 _autoserv_path = 'autoserv_dummy'
67 global _testing_mode
68 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000069
70 init(options.logfile)
mblighbb421852008-03-11 22:36:16 +000071 dispatcher = Dispatcher()
72 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
mbligh36768f02008-02-22 18:28:33 +000073
74 try:
75 while not _shutdown:
76 dispatcher.tick()
77 time.sleep(20)
mbligh36768f02008-02-22 18:28:33 +000078 except:
79 log_stacktrace("Uncaught exception; terminating monitor_db")
80
mbligh6f8bab42008-02-29 22:45:14 +000081 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +000082
83
84def handle_sigint(signum, frame):
85 global _shutdown
86 _shutdown = True
87 print "Shutdown request received."
88
89
90def init(logfile):
91 if logfile:
92 enable_logging(logfile)
93 print "%s> dispatcher starting" % time.strftime("%X %x")
94 print "My PID is %d" % os.getpid()
95
96 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
mbligh6f8bab42008-02-29 22:45:14 +000097 global _db
98 _db = DatabaseConn()
mbligh36768f02008-02-22 18:28:33 +000099
100 print "Setting signal handler"
101 signal.signal(signal.SIGINT, handle_sigint)
102
103 print "Connected! Running..."
104
105
106def enable_logging(logfile):
107 out_file = logfile
108 err_file = "%s.err" % logfile
109 print "Enabling logging to %s (%s)" % (out_file, err_file)
110 out_fd = open(out_file, "a", buffering=0)
111 err_fd = open(err_file, "a", buffering=0)
112
113 os.dup2(out_fd.fileno(), sys.stdout.fileno())
114 os.dup2(err_fd.fileno(), sys.stderr.fileno())
115
116 sys.stdout = out_fd
117 sys.stderr = err_fd
118
119
120def idle_hosts():
mbligh6f8bab42008-02-29 22:45:14 +0000121 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000122 SELECT * FROM hosts h WHERE
123 id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
124 (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
125 OR
126 (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
127 INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
128 )
mbligh5244cbb2008-04-24 20:39:52 +0000129 AND locked=false AND invalid=false
130 AND (h.status IS null OR h.status='Ready') """)
mbligh6f8bab42008-02-29 22:45:14 +0000131 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000132 return hosts
133
mblighd5c95802008-03-05 00:33:46 +0000134def queue_entries_to_abort():
135 rows = _db.execute("""
136 SELECT * FROM host_queue_entries WHERE status='Abort';
137 """)
138 qe = [HostQueueEntry(row=i) for i in rows]
139 return qe
mbligh36768f02008-02-22 18:28:33 +0000140
mblighe2586682008-02-29 22:45:46 +0000141def remove_file_or_dir(path):
142 if stat.S_ISDIR(os.stat(path).st_mode):
143 # directory
144 shutil.rmtree(path)
145 else:
146 # file
147 os.remove(path)
148
149
mbligh6f8bab42008-02-29 22:45:14 +0000150class DatabaseConn:
151 def __init__(self):
152 self.reconnect_wait = 20
153 self.conn = None
154 self.cur = None
mbligh36768f02008-02-22 18:28:33 +0000155
mbligh4eb2df22008-03-13 15:39:29 +0000156 import MySQLdb.converters
157 self.convert_dict = MySQLdb.converters.conversions
158 self.convert_dict.setdefault(bool, self.convert_boolean)
159
mbligh6f8bab42008-02-29 22:45:14 +0000160 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000161
162
mbligh4eb2df22008-03-13 15:39:29 +0000163 @staticmethod
164 def convert_boolean(boolean, conversion_dict):
165 'Convert booleans to integer strings'
166 return str(int(boolean))
167
168
mbligh6f8bab42008-02-29 22:45:14 +0000169 def connect(self):
170 self.disconnect()
171
172 # get global config and parse for info
173 c = global_config.global_config
174 dbase = "AUTOTEST_WEB"
mbligh104e9ce2008-03-11 22:01:44 +0000175 DB_HOST = c.get_config_value(dbase, "host")
176 DB_SCHEMA = c.get_config_value(dbase, "database")
mbligh6f8bab42008-02-29 22:45:14 +0000177
178 global _testing_mode
179 if _testing_mode:
180 DB_SCHEMA = 'stresstest_autotest_web'
181
mbligh104e9ce2008-03-11 22:01:44 +0000182 DB_USER = c.get_config_value(dbase, "user")
183 DB_PASS = c.get_config_value(dbase, "password")
mbligh6f8bab42008-02-29 22:45:14 +0000184
185 while not self.conn:
186 try:
mbligh4eb2df22008-03-13 15:39:29 +0000187 self.conn = MySQLdb.connect(
188 host=DB_HOST, user=DB_USER, passwd=DB_PASS,
189 db=DB_SCHEMA, conv=self.convert_dict)
mbligh6f8bab42008-02-29 22:45:14 +0000190
191 self.conn.autocommit(True)
192 self.cur = self.conn.cursor()
193 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000194 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000195 print "Can't connect to MYSQL; reconnecting"
196 time.sleep(self.reconnect_wait)
197 self.disconnect()
198
199
200 def disconnect(self):
201 if self.conn:
202 self.conn.close()
203 self.conn = None
204 self.cur = None
205
206
207 def execute(self, *args, **dargs):
208 while (True):
209 try:
210 self.cur.execute(*args, **dargs)
211 return self.cur.fetchall()
212 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000213 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000214 print "MYSQL connection died; reconnecting"
215 time.sleep(self.reconnect_wait)
216 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000217
218
mblighdbdac6c2008-03-05 15:49:58 +0000219def generate_parse_command(results_dir, flags=""):
220 parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
221 output = os.path.abspath(os.path.join(results_dir, '.parse.log'))
222 cmd = "%s %s -r -o %s > %s 2>&1 &"
223 return cmd % (parse, flags, results_dir, output)
224
225
mbligh36768f02008-02-22 18:28:33 +0000226def parse_results(results_dir, flags=""):
mbligh4314a712008-02-29 22:44:30 +0000227 if _testing_mode:
228 return
mblighdbdac6c2008-03-05 15:49:58 +0000229 os.system(generate_parse_command(results_dir, flags))
mbligh36768f02008-02-22 18:28:33 +0000230
231
mblighbb421852008-03-11 22:36:16 +0000232def send_notify_email(subject, message):
233 if not _notify_email:
234 return
235
236 message = "%s / %s / %s\n%s" % (socket.gethostname(), os.getpid(),
237 time.strftime("%X %x"), message)
238 sender = pwd.getpwuid(os.getuid())[0] # see os.getlogin() online docs
239 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
240 sender, _notify_email, subject, message)
241 mailer = smtplib.SMTP('localhost')
242 mailer.sendmail(sender, _notify_email, msg)
243 mailer.quit()
244
245
mbligh36768f02008-02-22 18:28:33 +0000246def log_stacktrace(reason):
247 (type, value, tb) = sys.exc_info()
248 str = "EXCEPTION: %s\n" % reason
mbligh36768f02008-02-22 18:28:33 +0000249 str += ''.join(traceback.format_exception(type, value, tb))
250
251 sys.stderr.write("\n%s\n" % str)
mblighbb421852008-03-11 22:36:16 +0000252 send_notify_email("monitor_db exception", str)
mbligh36768f02008-02-22 18:28:33 +0000253
mblighbb421852008-03-11 22:36:16 +0000254
255def get_proc_poll_fn(pid):
256 proc_path = os.path.join('/proc', str(pid))
257 def poll_fn():
258 if os.path.exists(proc_path):
259 return None
260 return 0 # we can't get a real exit code
261 return poll_fn
262
263
264def kill_autoserv(pid, poll_fn=None):
265 print 'killing', pid
266 if poll_fn is None:
267 poll_fn = get_proc_poll_fn(pid)
268 if poll_fn() == None:
269 os.kill(pid, signal.SIGCONT)
270 os.kill(pid, signal.SIGTERM)
mbligh36768f02008-02-22 18:28:33 +0000271
272
273class Dispatcher:
mbligh90a549d2008-03-25 23:52:34 +0000274 autoserv_procs_cache = None
showardec113162008-05-08 00:52:49 +0000275 max_running_agents = global_config.global_config.get_config_value(
276 _global_config_section, 'max_running_jobs', type=int)
277 max_jobs_started_per_cycle = (
278 global_config.global_config.get_config_value(
279 _global_config_section, 'max_jobs_started_per_cycle', type=int))
mbligh90a549d2008-03-25 23:52:34 +0000280
mblighbb421852008-03-11 22:36:16 +0000281 def __init__(self):
mbligh36768f02008-02-22 18:28:33 +0000282 self._agents = []
mbligh36768f02008-02-22 18:28:33 +0000283
mbligh36768f02008-02-22 18:28:33 +0000284
mblighbb421852008-03-11 22:36:16 +0000285 def do_initial_recovery(self, recover_hosts=True):
286 # always recover processes
287 self._recover_processes()
288
289 if recover_hosts:
290 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000291
292
293 def tick(self):
mbligh90a549d2008-03-25 23:52:34 +0000294 Dispatcher.autoserv_procs_cache = None
mblighbb421852008-03-11 22:36:16 +0000295 self._find_aborting()
296 self._find_more_work()
mbligh36768f02008-02-22 18:28:33 +0000297 self._handle_agents()
mbligh62ba2ed2008-04-30 17:09:25 +0000298 self._clear_inactive_blocks()
mbligh36768f02008-02-22 18:28:33 +0000299
300
301 def add_agent(self, agent):
302 self._agents.append(agent)
303 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000304
305 # Find agent corresponding to the specified queue_entry
306 def get_agents(self, queue_entry):
307 res_agents = []
308 for agent in self._agents:
309 if queue_entry.id in agent.queue_entry_ids:
310 res_agents.append(agent)
311 return res_agents
312
313
314 def remove_agent(self, agent):
315 self._agents.remove(agent)
mbligh36768f02008-02-22 18:28:33 +0000316
317
showardec113162008-05-08 00:52:49 +0000318 def num_started_agents(self):
319 return len([agent for agent in self._agents
320 if agent.is_started()])
321
322
mbligh90a549d2008-03-25 23:52:34 +0000323 @classmethod
324 def find_autoservs(cls, orphans_only=False):
mblighbb421852008-03-11 22:36:16 +0000325 """\
326 Returns a dict mapping pids to command lines for root autoserv
mbligh90a549d2008-03-25 23:52:34 +0000327 processes. If orphans_only=True, return only processes that
328 have been orphaned (i.e. parent pid = 1).
mblighbb421852008-03-11 22:36:16 +0000329 """
mbligh90a549d2008-03-25 23:52:34 +0000330 if cls.autoserv_procs_cache is not None:
331 return cls.autoserv_procs_cache
332
mblighbb421852008-03-11 22:36:16 +0000333 proc = subprocess.Popen(
mbligh90a549d2008-03-25 23:52:34 +0000334 ['/bin/ps', 'x', '-o', 'pid,pgid,ppid,comm,args'],
mblighbb421852008-03-11 22:36:16 +0000335 stdout=subprocess.PIPE)
336 # split each line into the four columns output by ps
mbligh90a549d2008-03-25 23:52:34 +0000337 procs = [line.split(None, 4) for line in
mblighbb421852008-03-11 22:36:16 +0000338 proc.communicate()[0].splitlines()]
mbligh90a549d2008-03-25 23:52:34 +0000339 autoserv_procs = {}
340 for proc in procs:
341 # check ppid == 1 for orphans
342 if orphans_only and proc[2] != 1:
343 continue
344 # only root autoserv processes have pgid == pid
345 if (proc[3] == 'autoserv' and # comm
346 proc[1] == proc[0]): # pgid == pid
347 # map pid to args
348 autoserv_procs[int(proc[0])] = proc[4]
349 cls.autoserv_procs_cache = autoserv_procs
350 return autoserv_procs
mblighbb421852008-03-11 22:36:16 +0000351
352
353 def recover_queue_entry(self, queue_entry, run_monitor):
354 job = queue_entry.job
355 if job.is_synchronous():
356 all_queue_entries = job.get_host_queue_entries()
357 else:
358 all_queue_entries = [queue_entry]
359 all_queue_entry_ids = [queue_entry.id for queue_entry
360 in all_queue_entries]
361 queue_task = RecoveryQueueTask(
362 job=queue_entry.job,
363 queue_entries=all_queue_entries,
364 run_monitor=run_monitor)
365 self.add_agent(Agent(tasks=[queue_task],
366 queue_entry_ids=all_queue_entry_ids))
367
368
369 def _recover_processes(self):
mbligh90a549d2008-03-25 23:52:34 +0000370 orphans = self.find_autoservs(orphans_only=True)
mblighbb421852008-03-11 22:36:16 +0000371
372 # first, recover running queue entries
373 rows = _db.execute("""SELECT * FROM host_queue_entries
374 WHERE status = 'Running'""")
375 queue_entries = [HostQueueEntry(row=i) for i in rows]
376 requeue_entries = []
377 recovered_entry_ids = set()
378 for queue_entry in queue_entries:
379 run_monitor = PidfileRunMonitor(
380 queue_entry.results_dir())
381 pid, exit_code = run_monitor.get_pidfile_info()
382 if pid is None:
383 # autoserv apparently never got run, so requeue
384 requeue_entries.append(queue_entry)
385 continue
386 if queue_entry.id in recovered_entry_ids:
387 # synchronous job we've already recovered
388 continue
389 print 'Recovering queue entry %d (pid %d)' % (
390 queue_entry.id, pid)
391 job = queue_entry.job
392 if job.is_synchronous():
393 for entry in job.get_host_queue_entries():
394 assert entry.active
395 recovered_entry_ids.add(entry.id)
396 self.recover_queue_entry(queue_entry,
397 run_monitor)
398 orphans.pop(pid, None)
399
400 # and requeue other active queue entries
401 rows = _db.execute("""SELECT * FROM host_queue_entries
402 WHERE active AND NOT complete
403 AND status != 'Running'
404 AND status != 'Pending'
405 AND status != 'Abort'
406 AND status != 'Aborting'""")
407 queue_entries = [HostQueueEntry(row=i) for i in rows]
408 for queue_entry in queue_entries + requeue_entries:
409 print 'Requeuing running QE %d' % queue_entry.id
mbligh90a549d2008-03-25 23:52:34 +0000410 queue_entry.clear_results_dir(dont_delete_files=True)
mblighbb421852008-03-11 22:36:16 +0000411 queue_entry.requeue()
412
413
414 # now kill any remaining autoserv processes
415 for pid in orphans.keys():
416 print 'Killing orphan %d (%s)' % (pid, orphans[pid])
417 kill_autoserv(pid)
418
419 # recover aborting tasks
mbligh90a549d2008-03-25 23:52:34 +0000420 rebooting_host_ids = set()
mblighd5c95802008-03-05 00:33:46 +0000421 rows = _db.execute("""SELECT * FROM host_queue_entries
422 WHERE status='Abort' or status='Aborting'""")
mblighbb421852008-03-11 22:36:16 +0000423 queue_entries = [HostQueueEntry(row=i) for i in rows]
424 for queue_entry in queue_entries:
mbligh90a549d2008-03-25 23:52:34 +0000425 print 'Recovering aborting QE %d' % queue_entry.id
mblighbb421852008-03-11 22:36:16 +0000426 queue_host = queue_entry.get_host()
427 reboot_task = RebootTask(queue_host)
428 verify_task = VerifyTask(host = queue_host)
429 self.add_agent(Agent(tasks=[reboot_task,
430 verify_task],
431 queue_entry_ids=[queue_entry.id]))
432 queue_entry.set_status('Aborted')
433 # Secure the host from being picked up
434 queue_host.set_status('Rebooting')
mbligh90a549d2008-03-25 23:52:34 +0000435 rebooting_host_ids.add(queue_host.id)
mblighd5c95802008-03-05 00:33:46 +0000436
mblighbb421852008-03-11 22:36:16 +0000437 # reverify hosts that were in the middle of verify, repair or
438 # reboot
mbligh90a549d2008-03-25 23:52:34 +0000439 self._reverify_hosts_where("""(status = 'Repairing' OR
440 status = 'Verifying' OR
441 status = 'Rebooting')""",
442 exclude_ids=rebooting_host_ids)
443
444 # finally, recover "Running" hosts with no active queue entries,
445 # although this should never happen
446 message = ('Recovering running host %s - this probably '
447 'indicates a scheduler bug')
448 self._reverify_hosts_where("""status = 'Running' AND
449 id NOT IN (SELECT host_id
450 FROM host_queue_entries
451 WHERE active)""",
452 print_message=message)
453
454
455 def _reverify_hosts_where(self, where,
456 print_message='Reverifying host %s',
457 exclude_ids=set()):
mbligh5244cbb2008-04-24 20:39:52 +0000458 rows = _db.execute('SELECT * FROM hosts WHERE locked = 0 AND '
459 'invalid = 0 AND ' + where)
mblighbb421852008-03-11 22:36:16 +0000460 hosts = [Host(row=i) for i in rows]
461 for host in hosts:
mbligh90a549d2008-03-25 23:52:34 +0000462 if host.id in exclude_ids:
463 continue
464 if print_message is not None:
465 print print_message % host.hostname
466 verify_task = VerifyTask(host = host)
467 self.add_agent(Agent(tasks = [verify_task]))
mblighbb421852008-03-11 22:36:16 +0000468
469
470 def _recover_hosts(self):
mbligh90a549d2008-03-25 23:52:34 +0000471 # recover "Repair Failed" hosts
472 message = 'Reverifying dead host %s'
473 self._reverify_hosts_where("status = 'Repair Failed'",
474 print_message=message)
mbligh36768f02008-02-22 18:28:33 +0000475
476
mbligh62ba2ed2008-04-30 17:09:25 +0000477 def _clear_inactive_blocks(self):
478 """
479 Clear out blocks for all completed jobs.
480 """
481 # this would be simpler using NOT IN (subquery), but MySQL
482 # treats all IN subqueries as dependent, so this optimizes much
483 # better
484 _db.execute("""
485 DELETE ihq FROM ineligible_host_queues ihq
486 LEFT JOIN (SELECT job_id FROM host_queue_entries
487 WHERE NOT complete) hqe
488 USING (job_id) WHERE hqe.job_id IS NULL""")
489
490
mbligh36768f02008-02-22 18:28:33 +0000491 def _find_more_work(self):
492 print "finding work"
493
mbligh36768f02008-02-22 18:28:33 +0000494 for host in idle_hosts():
495 tasks = host.next_queue_entries()
496 if tasks:
497 for next in tasks:
498 try:
499 agent = next.run(assigned_host=host)
500 if agent:
501 self.add_agent(agent)
mbligh36768f02008-02-22 18:28:33 +0000502 break
503 except:
504 next.set_status('Failed')
505
506# if next.host:
507# next.host.set_status('Ready')
508
509 log_stacktrace("task_id = %d" % next.id)
510
511
mblighd5c95802008-03-05 00:33:46 +0000512 def _find_aborting(self):
513 num_aborted = 0
514 # Find jobs that are aborting
515 for entry in queue_entries_to_abort():
516 agents_to_abort = self.get_agents(entry)
517 entry_host = entry.get_host()
518 reboot_task = RebootTask(entry_host)
519 verify_task = VerifyTask(host = entry_host)
520 tasks = [reboot_task, verify_task]
521 if agents_to_abort:
522 abort_task = AbortTask(entry, agents_to_abort)
523 tasks.insert(0, abort_task)
524 else:
525 entry.set_status('Aborted')
526 # just to make sure this host does not get
527 # taken away
528 entry_host.set_status('Rebooting')
529 self.add_agent(Agent(tasks=tasks,
530 queue_entry_ids = [entry.id]))
531 num_aborted += 1
532 if num_aborted >= 50:
533 break
534
535
mbligh36768f02008-02-22 18:28:33 +0000536 def _handle_agents(self):
537 still_running = []
showardec113162008-05-08 00:52:49 +0000538 num_started = self.num_started_agents()
539 start_new = (num_started < self.max_running_agents)
540 num_started_this_cycle = 0
mbligh36768f02008-02-22 18:28:33 +0000541 for agent in self._agents:
showardec113162008-05-08 00:52:49 +0000542 if not agent.is_started():
543 if not start_new:
544 still_running.append(agent)
545 continue
546 num_started += 1
547 num_started_this_cycle += 1
548 if (num_started >= self.max_running_agents or
549 num_started_this_cycle >=
550 self.max_jobs_started_per_cycle):
551 start_new = False
mbligh36768f02008-02-22 18:28:33 +0000552 agent.tick()
553 if not agent.is_done():
554 still_running.append(agent)
555 else:
556 print "agent finished"
557 self._agents = still_running
showardec113162008-05-08 00:52:49 +0000558 print num_started, 'running agents'
mbligh36768f02008-02-22 18:28:33 +0000559
560
561class RunMonitor(object):
562 def __init__(self, cmd, nice_level = None, log_file = None):
563 self.nice_level = nice_level
564 self.log_file = log_file
565 self.proc = self.run(cmd)
566
567 def run(self, cmd):
568 if self.nice_level:
569 nice_cmd = ['nice','-n', str(self.nice_level)]
570 nice_cmd.extend(cmd)
571 cmd = nice_cmd
572
573 out_file = None
574 if self.log_file:
575 try:
mblighbb421852008-03-11 22:36:16 +0000576 os.makedirs(os.path.dirname(self.log_file))
mblighcadb3532008-04-15 17:46:26 +0000577 except OSError, exc:
578 if exc.errno != errno.EEXIST:
579 log_stacktrace(
580 'Unexpected error creating logfile '
581 'directory for %s' % self.log_file)
582 try:
mbligh36768f02008-02-22 18:28:33 +0000583 out_file = open(self.log_file, 'a')
584 out_file.write("\n%s\n" % ('*'*80))
mblighcadb3532008-04-15 17:46:26 +0000585 out_file.write("%s> %s\n" %
586 (time.strftime("%X %x"), cmd))
mbligh36768f02008-02-22 18:28:33 +0000587 out_file.write("%s\n" % ('*'*80))
mblighcadb3532008-04-15 17:46:26 +0000588 except (OSError, IOError):
589 log_stacktrace('Error opening log file %s' %
590 self.log_file)
591
mbligh36768f02008-02-22 18:28:33 +0000592 if not out_file:
593 out_file = open('/dev/null', 'w')
mblighcadb3532008-04-15 17:46:26 +0000594
mbligh36768f02008-02-22 18:28:33 +0000595 in_devnull = open('/dev/null', 'r')
596 print "cmd = %s" % cmd
597 print "path = %s" % os.getcwd()
598
599 proc = subprocess.Popen(cmd, stdout=out_file,
600 stderr=subprocess.STDOUT, stdin=in_devnull)
601 out_file.close()
602 in_devnull.close()
603 return proc
604
605
mblighbb421852008-03-11 22:36:16 +0000606 def get_pid(self):
607 return self.proc.pid
608
609
mbligh36768f02008-02-22 18:28:33 +0000610 def kill(self):
mblighbb421852008-03-11 22:36:16 +0000611 kill_autoserv(self.get_pid(), self.exit_code)
612
mbligh36768f02008-02-22 18:28:33 +0000613
614 def exit_code(self):
615 return self.proc.poll()
616
617
mblighbb421852008-03-11 22:36:16 +0000618class PidfileException(Exception):
619 """\
620 Raised when there's some unexpected behavior with the pid file.
621 """
622
623
624class PidfileRunMonitor(RunMonitor):
625 def __init__(self, results_dir, cmd=None, nice_level=None,
626 log_file=None):
627 self.results_dir = os.path.abspath(results_dir)
628 self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
629 self.lost_process = False
mbligh90a549d2008-03-25 23:52:34 +0000630 self.start_time = time.time()
mblighbb421852008-03-11 22:36:16 +0000631 if cmd is None:
632 # we're reattaching to an existing pid, so don't call
633 # the superconstructor (we don't want to kick off a new
634 # process)
635 pass
636 else:
mblighd64e5702008-04-04 21:39:28 +0000637 super(PidfileRunMonitor, self).__init__(cmd,
638 nice_level, log_file)
mblighbb421852008-03-11 22:36:16 +0000639
640
641 def get_pid(self):
642 pid, exit_status = self.get_pidfile_info()
643 assert pid is not None
644 return pid
645
646
mbligh90a549d2008-03-25 23:52:34 +0000647 def _check_command_line(self, command_line, spacer=' ',
648 print_error=False):
649 results_dir_arg = spacer.join(('', '-r', self.results_dir, ''))
650 match = results_dir_arg in command_line
651 if print_error and not match:
652 print '%s not found in %s' % (repr(results_dir_arg),
653 repr(command_line))
654 return match
655
656
657 def _check_proc_fs(self, pid):
mblighbb421852008-03-11 22:36:16 +0000658 cmdline_path = os.path.join('/proc', str(pid), 'cmdline')
659 try:
660 cmdline_file = open(cmdline_path, 'r')
661 cmdline = cmdline_file.read().strip()
662 cmdline_file.close()
663 except IOError:
664 return False
665 # /proc/.../cmdline has \x00 separating args
mbligh90a549d2008-03-25 23:52:34 +0000666 return self._check_command_line(cmdline, spacer='\x00',
667 print_error=True)
mblighbb421852008-03-11 22:36:16 +0000668
669
670 def read_pidfile(self):
671 if not os.path.exists(self.pid_file):
672 return None, None
673 file_obj = open(self.pid_file, 'r')
674 lines = file_obj.readlines()
675 file_obj.close()
676 assert 1 <= len(lines) <= 2
677 try:
678 pid = int(lines[0])
679 exit_status = None
680 if len(lines) == 2:
681 exit_status = int(lines[1])
682 except ValueError, exc:
683 raise Exception('Corrupt pid file: ' + str(exc.args))
684
685 return pid, exit_status
686
687
mbligh90a549d2008-03-25 23:52:34 +0000688 def _find_autoserv_proc(self):
689 autoserv_procs = Dispatcher.find_autoservs()
690 for pid, args in autoserv_procs.iteritems():
691 if self._check_command_line(args):
692 return pid, args
693 return None, None
694
695
mblighbb421852008-03-11 22:36:16 +0000696 def get_pidfile_info(self):
697 """\
698 Returns:
699 None, None if autoserv has not yet run
700 pid, None if autoserv is running
701 pid, exit_status if autoserv has completed
702 """
703 if self.lost_process:
704 return self.pid, self.exit_status
705
706 pid, exit_status = self.read_pidfile()
707
mbligh90a549d2008-03-25 23:52:34 +0000708 if pid is None:
709 return self._handle_no_pid()
710
711 if exit_status is None:
712 # double check whether or not autoserv is running
713 proc_running = self._check_proc_fs(pid)
714 if proc_running:
715 return pid, exit_status
716
717 # pid but no process - maybe process *just* exited
mblighbb421852008-03-11 22:36:16 +0000718 pid, exit_status = self.read_pidfile()
mbligh90a549d2008-03-25 23:52:34 +0000719 if exit_status is None:
mblighbb421852008-03-11 22:36:16 +0000720 # autoserv exited without writing an exit code
721 # to the pidfile
722 error = ('autoserv died without writing exit '
723 'code')
724 message = error + '\nPid: %s\nPidfile: %s' % (
725 pid, self.pid_file)
726 print message
727 send_notify_email(error, message)
mbligh90a549d2008-03-25 23:52:34 +0000728 self.on_lost_process(pid)
mblighbb421852008-03-11 22:36:16 +0000729 return self.pid, self.exit_status
730
731 return pid, exit_status
732
733
mbligh90a549d2008-03-25 23:52:34 +0000734 def _handle_no_pid(self):
735 """\
736 Called when no pidfile is found or no pid is in the pidfile.
737 """
738 # is autoserv running?
739 pid, args = self._find_autoserv_proc()
740 if pid is None:
741 # no autoserv process running
742 message = 'No pid found at ' + self.pid_file
743 else:
744 message = ("Process %d (%s) hasn't written pidfile %s" %
745 (pid, args, self.pid_file))
746
747 print message
748 if time.time() - self.start_time > PIDFILE_TIMEOUT:
749 send_notify_email('Process has failed to write pidfile',
750 message)
751 if pid is not None:
752 kill_autoserv(pid)
753 else:
754 pid = 0
755 self.on_lost_process(pid)
756 return self.pid, self.exit_status
757
758 return None, None
759
760
761 def on_lost_process(self, pid):
762 """\
763 Called when autoserv has exited without writing an exit status,
764 or we've timed out waiting for autoserv to write a pid to the
765 pidfile. In either case, we just return failure and the caller
766 should signal some kind of warning.
767
768 pid is unimportant here, as it shouldn't be used by anyone.
769 """
770 self.lost_process = True
771 self.pid = pid
772 self.exit_status = 1
773
774
mblighbb421852008-03-11 22:36:16 +0000775 def exit_code(self):
776 pid, exit_code = self.get_pidfile_info()
mblighbb421852008-03-11 22:36:16 +0000777 return exit_code
778
779
mbligh36768f02008-02-22 18:28:33 +0000780class Agent(object):
mblighd5c95802008-03-05 00:33:46 +0000781 def __init__(self, tasks, queue_entry_ids=[]):
mbligh36768f02008-02-22 18:28:33 +0000782 self.active_task = None
783 self.queue = Queue.Queue(0)
784 self.dispatcher = None
mblighd5c95802008-03-05 00:33:46 +0000785 self.queue_entry_ids = queue_entry_ids
mbligh36768f02008-02-22 18:28:33 +0000786
787 for task in tasks:
788 self.add_task(task)
789
790
791 def add_task(self, task):
792 self.queue.put_nowait(task)
793 task.agent = self
794
795
796 def tick(self):
797 print "agent tick"
798 if self.active_task and not self.active_task.is_done():
799 self.active_task.poll()
800 else:
801 self._next_task();
802
803
804 def _next_task(self):
805 print "agent picking task"
806 if self.active_task:
807 assert self.active_task.is_done()
808
mblighe2586682008-02-29 22:45:46 +0000809 if not self.active_task.success:
810 self.on_task_failure()
811
mbligh36768f02008-02-22 18:28:33 +0000812 self.active_task = None
813 if not self.is_done():
814 self.active_task = self.queue.get_nowait()
815 if self.active_task:
816 self.active_task.start()
817
818
mblighe2586682008-02-29 22:45:46 +0000819 def on_task_failure(self):
mblighe2586682008-02-29 22:45:46 +0000820 self.queue = Queue.Queue(0)
821 for task in self.active_task.failure_tasks:
822 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +0000823
mblighe2586682008-02-29 22:45:46 +0000824
showardec113162008-05-08 00:52:49 +0000825 def is_started(self):
826 return self.active_task is not None
827
828
mbligh36768f02008-02-22 18:28:33 +0000829 def is_done(self):
830 return self.active_task == None and self.queue.empty()
831
832
833 def start(self):
834 assert self.dispatcher
835
836 self._next_task()
837
mblighd5c95802008-03-05 00:33:46 +0000838
mbligh36768f02008-02-22 18:28:33 +0000839class AgentTask(object):
mbligh16c722d2008-03-05 00:58:44 +0000840 def __init__(self, cmd, failure_tasks = []):
mbligh36768f02008-02-22 18:28:33 +0000841 self.done = False
842 self.failure_tasks = failure_tasks
843 self.started = False
844 self.cmd = cmd
mblighd5c95802008-03-05 00:33:46 +0000845 self.task = None
mbligh36768f02008-02-22 18:28:33 +0000846 self.agent = None
mblighd5c95802008-03-05 00:33:46 +0000847 self.monitor = None
mblighd64e5702008-04-04 21:39:28 +0000848 self.success = None
mbligh36768f02008-02-22 18:28:33 +0000849
850
851 def poll(self):
852 print "poll"
mblighd5c95802008-03-05 00:33:46 +0000853 if self.monitor:
mbligh36768f02008-02-22 18:28:33 +0000854 self.tick(self.monitor.exit_code())
855 else:
856 self.finished(False)
857
858
859 def tick(self, exit_code):
860 if exit_code==None:
861 return
862# print "exit_code was %d" % exit_code
863 if exit_code == 0:
864 success = True
865 else:
866 success = False
867
868 self.finished(success)
869
870
871 def is_done(self):
872 return self.done
873
874
875 def finished(self, success):
876 self.done = True
877 self.success = success
878 self.epilog()
879
880
881 def prolog(self):
882 pass
883
mblighd64e5702008-04-04 21:39:28 +0000884
885 def create_temp_resultsdir(self, suffix=''):
886 self.temp_results_dir = tempfile.mkdtemp(suffix=suffix)
887
888
889 def cleanup(self):
890 if (hasattr(self, 'temp_results_dir') and
891 os.path.exists(self.temp_results_dir)):
892 shutil.rmtree(self.temp_results_dir)
893
mbligh36768f02008-02-22 18:28:33 +0000894
895 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +0000896 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +0000897
898
899 def start(self):
900 assert self.agent
901
902 if not self.started:
903 self.prolog()
904 self.run()
905
906 self.started = True
907
mblighd64e5702008-04-04 21:39:28 +0000908
mbligh36768f02008-02-22 18:28:33 +0000909 def abort(self):
mblighd5c95802008-03-05 00:33:46 +0000910 if self.monitor:
911 self.monitor.kill()
912 self.done = True
mblighd64e5702008-04-04 21:39:28 +0000913 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +0000914
915
916 def run(self):
917 if self.cmd:
918 print "agent starting monitor"
mbligh36768f02008-02-22 18:28:33 +0000919 log_file = None
920 if hasattr(self, 'host'):
mblighbb421852008-03-11 22:36:16 +0000921 log_file = os.path.join(RESULTS_DIR, 'hosts',
922 self.host.hostname)
923 self.monitor = RunMonitor(
924 self.cmd, nice_level = AUTOSERV_NICE_LEVEL,
925 log_file = log_file)
mbligh36768f02008-02-22 18:28:33 +0000926
927
928class RepairTask(AgentTask):
mbligh16c722d2008-03-05 00:58:44 +0000929 def __init__(self, host, fail_queue_entry=None):
930 """\
931 fail_queue_entry: queue entry to mark failed if this repair
932 fails.
933 """
mblighd64e5702008-04-04 21:39:28 +0000934 self.create_temp_resultsdir('.repair')
935 cmd = [_autoserv_path , '-R', '-m', host.hostname,
936 '-r', self.temp_results_dir]
mbligh36768f02008-02-22 18:28:33 +0000937 self.host = host
mbligh16c722d2008-03-05 00:58:44 +0000938 self.fail_queue_entry = fail_queue_entry
mblighd64e5702008-04-04 21:39:28 +0000939 super(RepairTask, self).__init__(cmd)
mblighe2586682008-02-29 22:45:46 +0000940
mbligh36768f02008-02-22 18:28:33 +0000941
942 def prolog(self):
943 print "repair_task starting"
944 self.host.set_status('Repairing')
945
946
947 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +0000948 super(RepairTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +0000949 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000950 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +0000951 else:
mbligh16c722d2008-03-05 00:58:44 +0000952 self.host.set_status('Repair Failed')
953 if self.fail_queue_entry:
954 self.fail_queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +0000955
956
957class VerifyTask(AgentTask):
958 def __init__(self, queue_entry=None, host=None):
959 assert bool(queue_entry) != bool(host)
960
961 self.host = host or queue_entry.host
962 self.queue_entry = queue_entry
963
mblighd64e5702008-04-04 21:39:28 +0000964 self.create_temp_resultsdir('.verify')
mbligh48c10a52008-02-29 22:46:38 +0000965 cmd = [_autoserv_path,'-v','-m',self.host.hostname,
mbligh36768f02008-02-22 18:28:33 +0000966 '-r', self.temp_results_dir]
967
mbligh16c722d2008-03-05 00:58:44 +0000968 fail_queue_entry = None
969 if queue_entry and not queue_entry.meta_host:
970 fail_queue_entry = queue_entry
971 failure_tasks = [RepairTask(self.host, fail_queue_entry)]
mblighe2586682008-02-29 22:45:46 +0000972
mblighd64e5702008-04-04 21:39:28 +0000973 super(VerifyTask, self).__init__(cmd,
974 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +0000975
976
mbligh36768f02008-02-22 18:28:33 +0000977 def prolog(self):
978 print "starting verify on %s" % (self.host.hostname)
979 if self.queue_entry:
980 self.queue_entry.set_status('Verifying')
mblighdffd6372008-02-29 22:47:33 +0000981 self.queue_entry.clear_results_dir(
982 self.queue_entry.verify_results_dir())
mbligh36768f02008-02-22 18:28:33 +0000983 self.host.set_status('Verifying')
984
985
mblighd64e5702008-04-04 21:39:28 +0000986 def cleanup(self):
987 if not os.path.exists(self.temp_results_dir):
988 return
mbligh36768f02008-02-22 18:28:33 +0000989 if self.queue_entry and (self.success or
mblighd64e5702008-04-04 21:39:28 +0000990 not self.queue_entry.meta_host):
mbligh36768f02008-02-22 18:28:33 +0000991 self.move_results()
mblighd64e5702008-04-04 21:39:28 +0000992 super(VerifyTask, self).cleanup()
993
994
995 def epilog(self):
996 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +0000997
998 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000999 self.host.set_status('Ready')
1000 elif self.queue_entry:
mblighdffd6372008-02-29 22:47:33 +00001001 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +00001002
1003
1004 def move_results(self):
1005 assert self.queue_entry is not None
mblighe2586682008-02-29 22:45:46 +00001006 target_dir = self.queue_entry.verify_results_dir()
mbligh36768f02008-02-22 18:28:33 +00001007 if not os.path.exists(target_dir):
1008 os.makedirs(target_dir)
1009 files = os.listdir(self.temp_results_dir)
1010 for filename in files:
mblighbb421852008-03-11 22:36:16 +00001011 if filename == AUTOSERV_PID_FILE:
1012 continue
mblighe2586682008-02-29 22:45:46 +00001013 self.force_move(os.path.join(self.temp_results_dir,
1014 filename),
1015 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +00001016
1017
mblighe2586682008-02-29 22:45:46 +00001018 @staticmethod
1019 def force_move(source, dest):
1020 """\
1021 Replacement for shutil.move() that will delete the destination
1022 if it exists, even if it's a directory.
1023 """
1024 if os.path.exists(dest):
1025 print ('Warning: removing existing destination file ' +
1026 dest)
1027 remove_file_or_dir(dest)
1028 shutil.move(source, dest)
1029
1030
mblighdffd6372008-02-29 22:47:33 +00001031class VerifySynchronousTask(VerifyTask):
1032 def __init__(self, queue_entry):
mblighd64e5702008-04-04 21:39:28 +00001033 super(VerifySynchronousTask, self).__init__(
1034 queue_entry = queue_entry)
mblighdffd6372008-02-29 22:47:33 +00001035
1036
mbligh16c722d2008-03-05 00:58:44 +00001037 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001038 super(VerifySynchronousTask, self).epilog()
mbligh16c722d2008-03-05 00:58:44 +00001039 if self.success:
1040 if self.queue_entry.job.num_complete() > 0:
1041 # some other entry failed verify, and we've
1042 # already been marked as stopped
1043 return
mblighdffd6372008-02-29 22:47:33 +00001044
mbligh16c722d2008-03-05 00:58:44 +00001045 self.queue_entry.set_status('Pending')
1046 job = self.queue_entry.job
1047 if job.is_ready():
1048 agent = job.run(self.queue_entry)
1049 self.agent.dispatcher.add_agent(agent)
mblighe2586682008-02-29 22:45:46 +00001050
mbligh36768f02008-02-22 18:28:33 +00001051class QueueTask(AgentTask):
1052 def __init__(self, job, queue_entries, cmd):
mblighd64e5702008-04-04 21:39:28 +00001053 super(QueueTask, self).__init__(cmd)
mbligh36768f02008-02-22 18:28:33 +00001054 self.job = job
1055 self.queue_entries = queue_entries
1056
1057
mblighbb421852008-03-11 22:36:16 +00001058 @staticmethod
1059 def _write_keyval(results_dir, field, value):
1060 key_path = os.path.join(results_dir, 'keyval')
mbligh36768f02008-02-22 18:28:33 +00001061 keyval_file = open(key_path, 'a')
1062 print >> keyval_file, '%s=%d' % (field, value)
1063 keyval_file.close()
1064
1065
mblighbb421852008-03-11 22:36:16 +00001066 def results_dir(self):
1067 return self.queue_entries[0].results_dir()
1068
1069
1070 def run(self):
1071 """\
1072 Override AgentTask.run() so we can use a PidfileRunMonitor.
1073 """
1074 self.monitor = PidfileRunMonitor(self.results_dir(),
1075 cmd=self.cmd,
1076 nice_level=AUTOSERV_NICE_LEVEL)
1077
1078
mbligh36768f02008-02-22 18:28:33 +00001079 def prolog(self):
mblighe2586682008-02-29 22:45:46 +00001080 # write some job timestamps into the job keyval file
1081 queued = time.mktime(self.job.created_on.timetuple())
1082 started = time.time()
mblighbb421852008-03-11 22:36:16 +00001083 self._write_keyval(self.results_dir(), "job_queued", queued)
1084 self._write_keyval(self.results_dir(), "job_started", started)
mbligh36768f02008-02-22 18:28:33 +00001085 for queue_entry in self.queue_entries:
1086 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
1087 queue_entry.set_status('Running')
1088 queue_entry.host.set_status('Running')
mblighe2586682008-02-29 22:45:46 +00001089 if (not self.job.is_synchronous() and
1090 self.job.num_machines() > 1):
1091 assert len(self.queue_entries) == 1
1092 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001093
1094
1095 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001096 super(QueueTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001097 if self.success:
1098 status = 'Completed'
1099 else:
1100 status = 'Failed'
1101
mblighe2586682008-02-29 22:45:46 +00001102 # write another timestamp into the job keyval file
1103 finished = time.time()
mblighbb421852008-03-11 22:36:16 +00001104 self._write_keyval(self.results_dir(), "job_finished", finished)
mbligh36768f02008-02-22 18:28:33 +00001105 for queue_entry in self.queue_entries:
1106 queue_entry.set_status(status)
1107 queue_entry.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001108
1109 if self.job.is_synchronous() or self.job.num_machines()==1:
1110 if self.job.is_finished():
1111 parse_results(self.job.results_dir())
1112 else:
1113 for queue_entry in self.queue_entries:
mblighbb421852008-03-11 22:36:16 +00001114 parse_results(queue_entry.results_dir(),
1115 flags='-l 2')
1116
mbligh36768f02008-02-22 18:28:33 +00001117 print "queue_task finished with %s/%s" % (status, self.success)
1118
1119
mblighbb421852008-03-11 22:36:16 +00001120class RecoveryQueueTask(QueueTask):
1121 def __init__(self, job, queue_entries, run_monitor):
mblighd64e5702008-04-04 21:39:28 +00001122 super(RecoveryQueueTask, self).__init__(job,
1123 queue_entries, cmd=None)
mblighbb421852008-03-11 22:36:16 +00001124 self.run_monitor = run_monitor
1125
1126
1127 def run(self):
1128 self.monitor = self.run_monitor
1129
1130
1131 def prolog(self):
1132 # recovering an existing process - don't do prolog
1133 pass
1134
1135
mbligh36768f02008-02-22 18:28:33 +00001136class RebootTask(AgentTask):
mblighd5c95802008-03-05 00:33:46 +00001137 def __init__(self, host):
1138 global _autoserv_path
1139
1140 # Current implementation of autoserv requires control file
1141 # to be passed on reboot action request. TODO: remove when no
1142 # longer appropriate.
mblighd64e5702008-04-04 21:39:28 +00001143 self.create_temp_resultsdir('.reboot')
mblighd5c95802008-03-05 00:33:46 +00001144 self.cmd = [_autoserv_path, '-b', '-m', host.hostname,
mblighd64e5702008-04-04 21:39:28 +00001145 '-r', self.temp_results_dir, '/dev/null']
mbligh36768f02008-02-22 18:28:33 +00001146 self.host = host
mblighd64e5702008-04-04 21:39:28 +00001147 super(RebootTask, self).__init__(self.cmd,
mbligh16c722d2008-03-05 00:58:44 +00001148 failure_tasks=[RepairTask(host)])
1149
mblighd5c95802008-03-05 00:33:46 +00001150
1151 def prolog(self):
1152 print "starting reboot task for host: %s" % self.host.hostname
1153 self.host.set_status("Rebooting")
1154
mblighd5c95802008-03-05 00:33:46 +00001155
1156class AbortTask(AgentTask):
1157 def __init__(self, queue_entry, agents_to_abort):
1158 self.queue_entry = queue_entry
1159 self.agents_to_abort = agents_to_abort
1160 for agent in agents_to_abort:
1161 agent.dispatcher.remove_agent(agent)
mblighd64e5702008-04-04 21:39:28 +00001162 super(AbortTask, self).__init__('')
mbligh36768f02008-02-22 18:28:33 +00001163
1164
mblighd5c95802008-03-05 00:33:46 +00001165 def prolog(self):
1166 print "starting abort on host %s, job %s" % (
1167 self.queue_entry.host_id, self.queue_entry.job_id)
1168 self.queue_entry.set_status('Aborting')
1169
mbligh36768f02008-02-22 18:28:33 +00001170
mblighd5c95802008-03-05 00:33:46 +00001171 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001172 super(AbortTask, self).epilog()
mblighd5c95802008-03-05 00:33:46 +00001173 self.queue_entry.set_status('Aborted')
1174 self.success = True
mbligh36768f02008-02-22 18:28:33 +00001175
mblighd64e5702008-04-04 21:39:28 +00001176
mbligh36768f02008-02-22 18:28:33 +00001177 def run(self):
mblighd5c95802008-03-05 00:33:46 +00001178 for agent in self.agents_to_abort:
1179 if (agent.active_task):
1180 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001181
1182
1183class DBObject(object):
mblighe2586682008-02-29 22:45:46 +00001184 def __init__(self, fields, id=None, row=None, new_record=False):
1185 assert (bool(id) != bool(row)) and fields
mbligh36768f02008-02-22 18:28:33 +00001186
mblighe2586682008-02-29 22:45:46 +00001187 self.__table = self._get_table()
mbligh36768f02008-02-22 18:28:33 +00001188 self.__fields = fields
1189
1190 self.__new_record = new_record
1191
1192 if row is None:
1193 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
mbligh6f8bab42008-02-29 22:45:14 +00001194 rows = _db.execute(sql, (id,))
1195 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001196 raise "row not found (table=%s, id=%s)" % \
1197 (self.__table, id)
mbligh6f8bab42008-02-29 22:45:14 +00001198 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001199
mblighe2586682008-02-29 22:45:46 +00001200 assert len(row)==len(fields), (
1201 "table = %s, row = %s/%d, fields = %s/%d" % (
1202 self.__table, row, len(row), fields, len(fields)))
mbligh36768f02008-02-22 18:28:33 +00001203
1204 self.__valid_fields = {}
1205 for i,value in enumerate(row):
1206 self.__dict__[fields[i]] = value
1207 self.__valid_fields[fields[i]] = True
1208
1209 del self.__valid_fields['id']
1210
mblighe2586682008-02-29 22:45:46 +00001211
1212 @classmethod
1213 def _get_table(cls):
1214 raise NotImplementedError('Subclasses must override this')
1215
1216
mbligh36768f02008-02-22 18:28:33 +00001217 def count(self, where, table = None):
1218 if not table:
1219 table = self.__table
mbligh4314a712008-02-29 22:44:30 +00001220
mbligh6f8bab42008-02-29 22:45:14 +00001221 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001222 SELECT count(*) FROM %s
1223 WHERE %s
1224 """ % (table, where))
mbligh36768f02008-02-22 18:28:33 +00001225
mbligh6f8bab42008-02-29 22:45:14 +00001226 assert len(rows) == 1
1227
1228 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001229
1230
1231 def num_cols(self):
1232 return len(self.__fields)
1233
1234
1235 def update_field(self, field, value):
1236 assert self.__valid_fields[field]
1237
1238 if self.__dict__[field] == value:
1239 return
1240
1241 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
1242 (self.__table, field)
mbligh6f8bab42008-02-29 22:45:14 +00001243 _db.execute(query, (value, self.id))
mbligh36768f02008-02-22 18:28:33 +00001244
1245 self.__dict__[field] = value
1246
1247
1248 def save(self):
1249 if self.__new_record:
1250 keys = self.__fields[1:] # avoid id
1251 columns = ','.join([str(key) for key in keys])
1252 values = ['"%s"' % self.__dict__[key] for key in keys]
1253 values = ','.join(values)
1254 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1255 (self.__table, columns, values)
mbligh6f8bab42008-02-29 22:45:14 +00001256 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001257
1258
mblighe2586682008-02-29 22:45:46 +00001259 def delete(self):
1260 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1261 _db.execute(query, (self.id,))
1262
1263
1264 @classmethod
mbligh62ba2ed2008-04-30 17:09:25 +00001265 def fetch(cls, where, params=()):
mblighe2586682008-02-29 22:45:46 +00001266 rows = _db.execute(
mbligh62ba2ed2008-04-30 17:09:25 +00001267 'SELECT * FROM %s WHERE %s' % (cls._get_table(), where),
1268 params)
mblighe2586682008-02-29 22:45:46 +00001269 for row in rows:
1270 yield cls(row=row)
1271
mbligh36768f02008-02-22 18:28:33 +00001272
1273class IneligibleHostQueue(DBObject):
1274 def __init__(self, id=None, row=None, new_record=None):
1275 fields = ['id', 'job_id', 'host_id']
mblighd64e5702008-04-04 21:39:28 +00001276 super(IneligibleHostQueue, self).__init__(fields, id=id,
1277 row=row, new_record=new_record)
mblighe2586682008-02-29 22:45:46 +00001278
1279
1280 @classmethod
1281 def _get_table(cls):
1282 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001283
1284
1285class Host(DBObject):
1286 def __init__(self, id=None, row=None):
mbligh5244cbb2008-04-24 20:39:52 +00001287 fields = ['id', 'hostname', 'locked', 'synch_id','status',
1288 'invalid']
mblighd64e5702008-04-04 21:39:28 +00001289 super(Host, self).__init__(fields, id=id, row=row)
mblighe2586682008-02-29 22:45:46 +00001290
1291
1292 @classmethod
1293 def _get_table(cls):
1294 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001295
1296
1297 def current_task(self):
mbligh6f8bab42008-02-29 22:45:14 +00001298 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001299 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1300 """, (self.id,))
1301
mbligh6f8bab42008-02-29 22:45:14 +00001302 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001303 return None
1304 else:
mbligh6f8bab42008-02-29 22:45:14 +00001305 assert len(rows) == 1
1306 results = rows[0];
mbligh36768f02008-02-22 18:28:33 +00001307# print "current = %s" % results
1308 return HostQueueEntry(row=results)
1309
1310
1311 def next_queue_entries(self):
1312 if self.locked:
1313 print "%s locked, not queuing" % self.hostname
1314 return None
1315# print "%s/%s looking for work" % (self.hostname, self.platform_id)
mbligh6f8bab42008-02-29 22:45:14 +00001316 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001317 SELECT * FROM host_queue_entries
1318 WHERE ((host_id=%s) OR (meta_host IS NOT null AND
1319 (meta_host IN (
1320 SELECT label_id FROM hosts_labels WHERE host_id=%s
1321 )
1322 )
1323 AND job_id NOT IN (
1324 SELECT job_id FROM ineligible_host_queues
1325 WHERE host_id=%s
1326 )))
1327 AND NOT complete AND NOT active
1328 ORDER BY priority DESC, meta_host, id
1329 LIMIT 1
1330 """, (self.id,self.id, self.id))
1331
mbligh6f8bab42008-02-29 22:45:14 +00001332 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001333 return None
1334 else:
mbligh6f8bab42008-02-29 22:45:14 +00001335 return [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001336
1337 def yield_work(self):
1338 print "%s yielding work" % self.hostname
1339 if self.current_task():
1340 self.current_task().requeue()
1341
1342 def set_status(self,status):
mblighbb421852008-03-11 22:36:16 +00001343 print '%s -> %s' % (self.hostname, status)
mbligh36768f02008-02-22 18:28:33 +00001344 self.update_field('status',status)
1345
1346
1347class HostQueueEntry(DBObject):
1348 def __init__(self, id=None, row=None):
1349 assert id or row
1350 fields = ['id', 'job_id', 'host_id', 'priority', 'status',
1351 'meta_host', 'active', 'complete']
mblighd64e5702008-04-04 21:39:28 +00001352 super(HostQueueEntry, self).__init__(fields, id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001353 self.job = Job(self.job_id)
1354
1355 if self.host_id:
1356 self.host = Host(self.host_id)
1357 else:
1358 self.host = None
1359
1360 self.queue_log_path = os.path.join(self.job.results_dir(),
1361 'queue.log.' + str(self.id))
1362
1363
mblighe2586682008-02-29 22:45:46 +00001364 @classmethod
1365 def _get_table(cls):
1366 return 'host_queue_entries'
1367
1368
mbligh36768f02008-02-22 18:28:33 +00001369 def set_host(self, host):
1370 if host:
1371 self.queue_log_record('Assigning host ' + host.hostname)
1372 self.update_field('host_id', host.id)
1373 self.update_field('active', True)
mblighe2586682008-02-29 22:45:46 +00001374 self.block_host(host.id)
mbligh36768f02008-02-22 18:28:33 +00001375 else:
1376 self.queue_log_record('Releasing host')
mblighe2586682008-02-29 22:45:46 +00001377 self.unblock_host(self.host.id)
mbligh36768f02008-02-22 18:28:33 +00001378 self.update_field('host_id', None)
1379
1380 self.host = host
1381
1382
1383 def get_host(self):
mblighe2586682008-02-29 22:45:46 +00001384 return self.host
mbligh36768f02008-02-22 18:28:33 +00001385
1386
1387 def queue_log_record(self, log_line):
mblighe2586682008-02-29 22:45:46 +00001388 now = str(datetime.datetime.now())
mbligh36768f02008-02-22 18:28:33 +00001389 queue_log = open(self.queue_log_path, 'a', 0)
mblighe2586682008-02-29 22:45:46 +00001390 queue_log.write(now + ' ' + log_line + '\n')
mbligh36768f02008-02-22 18:28:33 +00001391 queue_log.close()
1392
1393
mblighe2586682008-02-29 22:45:46 +00001394 def block_host(self, host_id):
1395 print "creating block %s/%s" % (self.job.id, host_id)
1396 row = [0, self.job.id, host_id]
1397 block = IneligibleHostQueue(row=row, new_record=True)
1398 block.save()
1399
1400
1401 def unblock_host(self, host_id):
1402 print "removing block %s/%s" % (self.job.id, host_id)
showarda0939722008-05-06 21:18:13 +00001403 blocks = IneligibleHostQueue.fetch(
1404 'job_id=%d and host_id=%d' % (self.job.id, host_id))
1405 for block in blocks:
1406 block.delete()
mblighe2586682008-02-29 22:45:46 +00001407
1408
mbligh36768f02008-02-22 18:28:33 +00001409 def results_dir(self):
mblighe2586682008-02-29 22:45:46 +00001410 if self.job.is_synchronous() or self.job.num_machines() == 1:
1411 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001412 else:
1413 assert self.host
mblighe2586682008-02-29 22:45:46 +00001414 return os.path.join(self.job.job_dir,
1415 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +00001416
mblighe2586682008-02-29 22:45:46 +00001417
1418 def verify_results_dir(self):
1419 if self.job.is_synchronous() or self.job.num_machines() > 1:
1420 assert self.host
1421 return os.path.join(self.job.job_dir,
1422 self.host.hostname)
1423 else:
1424 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001425
1426
1427 def set_status(self, status):
1428 self.update_field('status', status)
1429 if self.host:
1430 hostname = self.host.hostname
1431 else:
1432 hostname = 'no host'
1433 print "%s/%d status -> %s" % (hostname, self.id, self.status)
1434 if status in ['Queued']:
1435 self.update_field('complete', False)
1436 self.update_field('active', False)
1437
mblighd5c95802008-03-05 00:33:46 +00001438 if status in ['Pending', 'Running', 'Verifying', 'Starting',
1439 'Abort', 'Aborting']:
mbligh36768f02008-02-22 18:28:33 +00001440 self.update_field('complete', False)
1441 self.update_field('active', True)
1442
mblighd5c95802008-03-05 00:33:46 +00001443 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
mbligh36768f02008-02-22 18:28:33 +00001444 self.update_field('complete', True)
1445 self.update_field('active', False)
1446
1447
1448 def run(self,assigned_host=None):
1449 if self.meta_host:
1450 assert assigned_host
mblighe2586682008-02-29 22:45:46 +00001451 # ensure results dir exists for the queue log
mbligh36768f02008-02-22 18:28:33 +00001452 self.job.create_results_dir()
1453 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001454
mbligh36768f02008-02-22 18:28:33 +00001455 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1456 self.meta_host, self.host.hostname, self.status)
1457
1458 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001459
mbligh36768f02008-02-22 18:28:33 +00001460 def requeue(self):
1461 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001462
mbligh36768f02008-02-22 18:28:33 +00001463 if self.meta_host:
1464 self.set_host(None)
1465
1466
mblighe2586682008-02-29 22:45:46 +00001467 def handle_host_failure(self):
1468 """\
1469 Called when this queue entry's host has failed verification and
1470 repair.
1471 """
mblighdffd6372008-02-29 22:47:33 +00001472 assert not self.meta_host
1473 self.set_status('Failed')
1474 if self.job.is_synchronous():
1475 self.job.stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00001476
1477
mbligh90a549d2008-03-25 23:52:34 +00001478 def clear_results_dir(self, results_dir=None, dont_delete_files=False):
mblighe2586682008-02-29 22:45:46 +00001479 results_dir = results_dir or self.results_dir()
1480 if not os.path.exists(results_dir):
1481 return
mbligh90a549d2008-03-25 23:52:34 +00001482 if dont_delete_files:
1483 temp_dir = tempfile.mkdtemp(suffix='.clear_results')
1484 print 'Moving results from %s to %s' % (results_dir,
1485 temp_dir)
mblighe2586682008-02-29 22:45:46 +00001486 for filename in os.listdir(results_dir):
mblighe2586682008-02-29 22:45:46 +00001487 path = os.path.join(results_dir, filename)
mbligh90a549d2008-03-25 23:52:34 +00001488 if dont_delete_files:
1489 shutil.move(path,
1490 os.path.join(temp_dir, filename))
1491 else:
1492 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00001493
1494
1495class Job(DBObject):
1496 def __init__(self, id=None, row=None):
1497 assert id or row
mblighd64e5702008-04-04 21:39:28 +00001498 super(Job, self).__init__(
mblighe2586682008-02-29 22:45:46 +00001499 ['id','owner','name','priority',
1500 'control_file','control_type','created_on',
1501 'synch_type', 'synch_count','synchronizing'],
1502 id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001503
mblighe2586682008-02-29 22:45:46 +00001504 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
1505 self.owner))
1506
1507
1508 @classmethod
1509 def _get_table(cls):
1510 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001511
1512
1513 def is_server_job(self):
1514 return self.control_type != 2
1515
1516
1517 def get_host_queue_entries(self):
mbligh6f8bab42008-02-29 22:45:14 +00001518 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001519 SELECT * FROM host_queue_entries
1520 WHERE job_id= %s
1521 """, (self.id,))
mbligh6f8bab42008-02-29 22:45:14 +00001522 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001523
1524 assert len(entries)>0
1525
1526 return entries
1527
1528
1529 def set_status(self, status, update_queues=False):
1530 self.update_field('status',status)
1531
1532 if update_queues:
1533 for queue_entry in self.get_host_queue_entries():
1534 queue_entry.set_status(status)
1535
1536
1537 def is_synchronous(self):
1538 return self.synch_type == 2
1539
1540
1541 def is_ready(self):
1542 if not self.is_synchronous():
1543 return True
1544 sql = "job_id=%s AND status='Pending'" % self.id
1545 count = self.count(sql, table='host_queue_entries')
1546 return (count == self.synch_count)
1547
1548
1549 def ready_to_synchronize(self):
1550 # heuristic
1551 queue_entries = self.get_host_queue_entries()
1552 count = 0
1553 for queue_entry in queue_entries:
1554 if queue_entry.status == 'Pending':
1555 count += 1
1556
1557 return (count/self.synch_count >= 0.5)
1558
1559
1560 def start_synchronizing(self):
1561 self.update_field('synchronizing', True)
1562
1563
1564 def results_dir(self):
1565 return self.job_dir
1566
1567 def num_machines(self, clause = None):
1568 sql = "job_id=%s" % self.id
1569 if clause:
1570 sql += " AND (%s)" % clause
1571 return self.count(sql, table='host_queue_entries')
1572
1573
1574 def num_queued(self):
1575 return self.num_machines('not complete')
1576
1577
1578 def num_active(self):
1579 return self.num_machines('active')
1580
1581
1582 def num_complete(self):
1583 return self.num_machines('complete')
1584
1585
1586 def is_finished(self):
1587 left = self.num_queued()
1588 print "%s: %s machines left" % (self.name, left)
1589 return left==0
1590
1591 def stop_synchronizing(self):
1592 self.update_field('synchronizing', False)
1593 self.set_status('Queued', update_queues = False)
1594
1595
mblighe2586682008-02-29 22:45:46 +00001596 def stop_all_entries(self):
1597 for child_entry in self.get_host_queue_entries():
1598 if not child_entry.complete:
1599 child_entry.set_status('Stopped')
1600
1601
1602 def write_to_machines_file(self, queue_entry):
1603 hostname = queue_entry.get_host().hostname
1604 print "writing %s to job %s machines file" % (hostname, self.id)
1605 file_path = os.path.join(self.job_dir, '.machines')
1606 mf = open(file_path, 'a')
1607 mf.write("%s\n" % queue_entry.get_host().hostname)
1608 mf.close()
mbligh36768f02008-02-22 18:28:33 +00001609
1610
1611 def create_results_dir(self, queue_entry=None):
1612 print "create: active: %s complete %s" % (self.num_active(),
1613 self.num_complete())
1614
1615 if not os.path.exists(self.job_dir):
1616 os.makedirs(self.job_dir)
1617
1618 if queue_entry:
1619 return queue_entry.results_dir()
1620 return self.job_dir
1621
1622
1623 def run(self, queue_entry):
1624 results_dir = self.create_results_dir(queue_entry)
1625
1626 if self.is_synchronous():
1627 if not self.is_ready():
mblighd5c95802008-03-05 00:33:46 +00001628 return Agent([VerifySynchronousTask(
1629 queue_entry = queue_entry)],
1630 [queue_entry.id])
mbligh36768f02008-02-22 18:28:33 +00001631
1632 queue_entry.set_status('Starting')
1633
1634 ctrl = open(os.tmpnam(), 'w')
1635 if self.control_file:
1636 ctrl.write(self.control_file)
1637 else:
1638 ctrl.write("")
1639 ctrl.flush()
1640
1641 if self.is_synchronous():
mbligh36768f02008-02-22 18:28:33 +00001642 queue_entries = self.get_host_queue_entries()
1643 else:
1644 assert queue_entry
mbligh36768f02008-02-22 18:28:33 +00001645 queue_entries = [queue_entry]
mblighe2586682008-02-29 22:45:46 +00001646 hostnames = ','.join([entry.get_host().hostname
1647 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00001648
mbligh6437ff52008-04-17 15:24:38 +00001649 # determine the job tag
1650 if self.is_synchronous() or self.num_machines() == 1:
1651 job_name = "%s-%s" % (self.id, self.owner)
1652 else:
1653 job_name = "%s-%s/%s" % (self.id, self.owner,
1654 hostnames)
1655
1656 params = [_autoserv_path, '-P', job_name, '-p', '-n',
mblighbb421852008-03-11 22:36:16 +00001657 '-r', os.path.abspath(results_dir),
1658 '-b', '-u', self.owner, '-l', self.name,
1659 '-m', hostnames, ctrl.name]
mbligh36768f02008-02-22 18:28:33 +00001660
1661 if not self.is_server_job():
1662 params.append('-c')
1663
1664 tasks = []
1665 if not self.is_synchronous():
1666 tasks.append(VerifyTask(queue_entry))
mblighe2586682008-02-29 22:45:46 +00001667
1668 tasks.append(QueueTask(job = self,
1669 queue_entries = queue_entries,
1670 cmd = params))
mbligh36768f02008-02-22 18:28:33 +00001671
mblighd5c95802008-03-05 00:33:46 +00001672 ids = []
1673 for entry in queue_entries:
1674 ids.append(entry.id)
1675
1676 agent = Agent(tasks, ids)
mbligh36768f02008-02-22 18:28:33 +00001677
1678 return agent
1679
1680
1681if __name__ == '__main__':
1682 main()