blob: f1e340c3e752ad2575f266e88ed98cbc122a81ec [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
mblighbb421852008-03-11 22:36:16 +00009import optparse, signal, smtplib, socket, datetime, stat, pwd
mblighb090f142008-02-27 21:33:46 +000010from common import global_config
11
mbligh36768f02008-02-22 18:28:33 +000012RESULTS_DIR = '.'
13AUTOSERV_NICE_LEVEL = 10
14
15AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
16
17if os.environ.has_key('AUTOTEST_DIR'):
18 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
19AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
20AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
21
22if AUTOTEST_SERVER_DIR not in sys.path:
23 sys.path.insert(0, AUTOTEST_SERVER_DIR)
24
mblighbb421852008-03-11 22:36:16 +000025AUTOSERV_PID_FILE = '.autoserv_execute'
mbligh90a549d2008-03-25 23:52:34 +000026# how long to wait for autoserv to write a pidfile
27PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000028
mbligh6f8bab42008-02-29 22:45:14 +000029_db = None
mbligh36768f02008-02-22 18:28:33 +000030_shutdown = False
31_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000032_autoserv_path = 'autoserv'
33_testing_mode = False
mbligh36768f02008-02-22 18:28:33 +000034
35
36def main():
37 usage = 'usage: %prog [options] results_dir'
38
39 parser = optparse.OptionParser(usage)
mblighbb421852008-03-11 22:36:16 +000040 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
mbligh36768f02008-02-22 18:28:33 +000041 action='store_true')
42 parser.add_option('--logfile', help='Set a log file that all stdout ' +
43 'should be redirected to. Stderr will go to this ' +
44 'file + ".err"')
45 parser.add_option('--notify', help='Set an email address to be ' +
46 'notified of exceptions')
mbligh4314a712008-02-29 22:44:30 +000047 parser.add_option('--test', help='Indicate that scheduler is under ' +
48 'test and should use dummy autoserv and no parsing',
49 action='store_true')
mbligh36768f02008-02-22 18:28:33 +000050 (options, args) = parser.parse_args()
51 if len(args) != 1:
52 parser.print_usage()
53 return
54
55 global RESULTS_DIR
56 RESULTS_DIR = args[0]
57
58 global _notify_email
59 _notify_email = options.notify
mbligh4314a712008-02-29 22:44:30 +000060
61 if options.test:
62 global _autoserv_path
63 _autoserv_path = 'autoserv_dummy'
64 global _testing_mode
65 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000066
67 init(options.logfile)
mblighbb421852008-03-11 22:36:16 +000068 dispatcher = Dispatcher()
69 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
mbligh36768f02008-02-22 18:28:33 +000070
71 try:
72 while not _shutdown:
73 dispatcher.tick()
74 time.sleep(20)
mbligh36768f02008-02-22 18:28:33 +000075 except:
76 log_stacktrace("Uncaught exception; terminating monitor_db")
77
mbligh6f8bab42008-02-29 22:45:14 +000078 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +000079
80
81def handle_sigint(signum, frame):
82 global _shutdown
83 _shutdown = True
84 print "Shutdown request received."
85
86
87def init(logfile):
88 if logfile:
89 enable_logging(logfile)
90 print "%s> dispatcher starting" % time.strftime("%X %x")
91 print "My PID is %d" % os.getpid()
92
93 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
mbligh6f8bab42008-02-29 22:45:14 +000094 global _db
95 _db = DatabaseConn()
mbligh36768f02008-02-22 18:28:33 +000096
97 print "Setting signal handler"
98 signal.signal(signal.SIGINT, handle_sigint)
99
100 print "Connected! Running..."
101
102
103def enable_logging(logfile):
104 out_file = logfile
105 err_file = "%s.err" % logfile
106 print "Enabling logging to %s (%s)" % (out_file, err_file)
107 out_fd = open(out_file, "a", buffering=0)
108 err_fd = open(err_file, "a", buffering=0)
109
110 os.dup2(out_fd.fileno(), sys.stdout.fileno())
111 os.dup2(err_fd.fileno(), sys.stderr.fileno())
112
113 sys.stdout = out_fd
114 sys.stderr = err_fd
115
116
117def idle_hosts():
mbligh6f8bab42008-02-29 22:45:14 +0000118 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000119 SELECT * FROM hosts h WHERE
120 id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
121 (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
122 OR
123 (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
124 INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
125 )
126 AND locked=false AND (h.status IS null OR h.status='Ready') """)
mbligh6f8bab42008-02-29 22:45:14 +0000127 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000128 return hosts
129
mblighd5c95802008-03-05 00:33:46 +0000130def queue_entries_to_abort():
131 rows = _db.execute("""
132 SELECT * FROM host_queue_entries WHERE status='Abort';
133 """)
134 qe = [HostQueueEntry(row=i) for i in rows]
135 return qe
mbligh36768f02008-02-22 18:28:33 +0000136
mblighe2586682008-02-29 22:45:46 +0000137def remove_file_or_dir(path):
138 if stat.S_ISDIR(os.stat(path).st_mode):
139 # directory
140 shutil.rmtree(path)
141 else:
142 # file
143 os.remove(path)
144
145
mbligh6f8bab42008-02-29 22:45:14 +0000146class DatabaseConn:
147 def __init__(self):
148 self.reconnect_wait = 20
149 self.conn = None
150 self.cur = None
mbligh36768f02008-02-22 18:28:33 +0000151
mbligh4eb2df22008-03-13 15:39:29 +0000152 import MySQLdb.converters
153 self.convert_dict = MySQLdb.converters.conversions
154 self.convert_dict.setdefault(bool, self.convert_boolean)
155
mbligh6f8bab42008-02-29 22:45:14 +0000156 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000157
158
mbligh4eb2df22008-03-13 15:39:29 +0000159 @staticmethod
160 def convert_boolean(boolean, conversion_dict):
161 'Convert booleans to integer strings'
162 return str(int(boolean))
163
164
mbligh6f8bab42008-02-29 22:45:14 +0000165 def connect(self):
166 self.disconnect()
167
168 # get global config and parse for info
169 c = global_config.global_config
170 dbase = "AUTOTEST_WEB"
mbligh104e9ce2008-03-11 22:01:44 +0000171 DB_HOST = c.get_config_value(dbase, "host")
172 DB_SCHEMA = c.get_config_value(dbase, "database")
mbligh6f8bab42008-02-29 22:45:14 +0000173
174 global _testing_mode
175 if _testing_mode:
176 DB_SCHEMA = 'stresstest_autotest_web'
177
mbligh104e9ce2008-03-11 22:01:44 +0000178 DB_USER = c.get_config_value(dbase, "user")
179 DB_PASS = c.get_config_value(dbase, "password")
mbligh6f8bab42008-02-29 22:45:14 +0000180
181 while not self.conn:
182 try:
mbligh4eb2df22008-03-13 15:39:29 +0000183 self.conn = MySQLdb.connect(
184 host=DB_HOST, user=DB_USER, passwd=DB_PASS,
185 db=DB_SCHEMA, conv=self.convert_dict)
mbligh6f8bab42008-02-29 22:45:14 +0000186
187 self.conn.autocommit(True)
188 self.cur = self.conn.cursor()
189 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000190 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000191 print "Can't connect to MYSQL; reconnecting"
192 time.sleep(self.reconnect_wait)
193 self.disconnect()
194
195
196 def disconnect(self):
197 if self.conn:
198 self.conn.close()
199 self.conn = None
200 self.cur = None
201
202
203 def execute(self, *args, **dargs):
204 while (True):
205 try:
206 self.cur.execute(*args, **dargs)
207 return self.cur.fetchall()
208 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000209 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000210 print "MYSQL connection died; reconnecting"
211 time.sleep(self.reconnect_wait)
212 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000213
214
mblighdbdac6c2008-03-05 15:49:58 +0000215def generate_parse_command(results_dir, flags=""):
216 parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
217 output = os.path.abspath(os.path.join(results_dir, '.parse.log'))
218 cmd = "%s %s -r -o %s > %s 2>&1 &"
219 return cmd % (parse, flags, results_dir, output)
220
221
mbligh36768f02008-02-22 18:28:33 +0000222def parse_results(results_dir, flags=""):
mbligh4314a712008-02-29 22:44:30 +0000223 if _testing_mode:
224 return
mblighdbdac6c2008-03-05 15:49:58 +0000225 os.system(generate_parse_command(results_dir, flags))
mbligh36768f02008-02-22 18:28:33 +0000226
227
mblighbb421852008-03-11 22:36:16 +0000228def send_notify_email(subject, message):
229 if not _notify_email:
230 return
231
232 message = "%s / %s / %s\n%s" % (socket.gethostname(), os.getpid(),
233 time.strftime("%X %x"), message)
234 sender = pwd.getpwuid(os.getuid())[0] # see os.getlogin() online docs
235 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
236 sender, _notify_email, subject, message)
237 mailer = smtplib.SMTP('localhost')
238 mailer.sendmail(sender, _notify_email, msg)
239 mailer.quit()
240
241
mbligh36768f02008-02-22 18:28:33 +0000242def log_stacktrace(reason):
243 (type, value, tb) = sys.exc_info()
244 str = "EXCEPTION: %s\n" % reason
mbligh36768f02008-02-22 18:28:33 +0000245 str += ''.join(traceback.format_exception(type, value, tb))
246
247 sys.stderr.write("\n%s\n" % str)
mblighbb421852008-03-11 22:36:16 +0000248 send_notify_email("monitor_db exception", str)
mbligh36768f02008-02-22 18:28:33 +0000249
mblighbb421852008-03-11 22:36:16 +0000250
251def get_proc_poll_fn(pid):
252 proc_path = os.path.join('/proc', str(pid))
253 def poll_fn():
254 if os.path.exists(proc_path):
255 return None
256 return 0 # we can't get a real exit code
257 return poll_fn
258
259
260def kill_autoserv(pid, poll_fn=None):
261 print 'killing', pid
262 if poll_fn is None:
263 poll_fn = get_proc_poll_fn(pid)
264 if poll_fn() == None:
265 os.kill(pid, signal.SIGCONT)
266 os.kill(pid, signal.SIGTERM)
mbligh36768f02008-02-22 18:28:33 +0000267
268
269class Dispatcher:
mbligh90a549d2008-03-25 23:52:34 +0000270 autoserv_procs_cache = None
271
mblighbb421852008-03-11 22:36:16 +0000272 def __init__(self):
mbligh36768f02008-02-22 18:28:33 +0000273 self._agents = []
mbligh36768f02008-02-22 18:28:33 +0000274
mbligh36768f02008-02-22 18:28:33 +0000275
mblighbb421852008-03-11 22:36:16 +0000276 def do_initial_recovery(self, recover_hosts=True):
277 # always recover processes
278 self._recover_processes()
279
280 if recover_hosts:
281 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000282
283
284 def tick(self):
mbligh90a549d2008-03-25 23:52:34 +0000285 Dispatcher.autoserv_procs_cache = None
mblighbb421852008-03-11 22:36:16 +0000286 self._find_aborting()
287 self._find_more_work()
mbligh36768f02008-02-22 18:28:33 +0000288 self._handle_agents()
289
290
291 def add_agent(self, agent):
292 self._agents.append(agent)
293 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000294
295 # Find agent corresponding to the specified queue_entry
296 def get_agents(self, queue_entry):
297 res_agents = []
298 for agent in self._agents:
299 if queue_entry.id in agent.queue_entry_ids:
300 res_agents.append(agent)
301 return res_agents
302
303
304 def remove_agent(self, agent):
305 self._agents.remove(agent)
mbligh36768f02008-02-22 18:28:33 +0000306
307
mbligh90a549d2008-03-25 23:52:34 +0000308 @classmethod
309 def find_autoservs(cls, orphans_only=False):
mblighbb421852008-03-11 22:36:16 +0000310 """\
311 Returns a dict mapping pids to command lines for root autoserv
mbligh90a549d2008-03-25 23:52:34 +0000312 processes. If orphans_only=True, return only processes that
313 have been orphaned (i.e. parent pid = 1).
mblighbb421852008-03-11 22:36:16 +0000314 """
mbligh90a549d2008-03-25 23:52:34 +0000315 if cls.autoserv_procs_cache is not None:
316 return cls.autoserv_procs_cache
317
mblighbb421852008-03-11 22:36:16 +0000318 proc = subprocess.Popen(
mbligh90a549d2008-03-25 23:52:34 +0000319 ['/bin/ps', 'x', '-o', 'pid,pgid,ppid,comm,args'],
mblighbb421852008-03-11 22:36:16 +0000320 stdout=subprocess.PIPE)
321 # split each line into the four columns output by ps
mbligh90a549d2008-03-25 23:52:34 +0000322 procs = [line.split(None, 4) for line in
mblighbb421852008-03-11 22:36:16 +0000323 proc.communicate()[0].splitlines()]
mbligh90a549d2008-03-25 23:52:34 +0000324 autoserv_procs = {}
325 for proc in procs:
326 # check ppid == 1 for orphans
327 if orphans_only and proc[2] != 1:
328 continue
329 # only root autoserv processes have pgid == pid
330 if (proc[3] == 'autoserv' and # comm
331 proc[1] == proc[0]): # pgid == pid
332 # map pid to args
333 autoserv_procs[int(proc[0])] = proc[4]
334 cls.autoserv_procs_cache = autoserv_procs
335 return autoserv_procs
mblighbb421852008-03-11 22:36:16 +0000336
337
338 def recover_queue_entry(self, queue_entry, run_monitor):
339 job = queue_entry.job
340 if job.is_synchronous():
341 all_queue_entries = job.get_host_queue_entries()
342 else:
343 all_queue_entries = [queue_entry]
344 all_queue_entry_ids = [queue_entry.id for queue_entry
345 in all_queue_entries]
346 queue_task = RecoveryQueueTask(
347 job=queue_entry.job,
348 queue_entries=all_queue_entries,
349 run_monitor=run_monitor)
350 self.add_agent(Agent(tasks=[queue_task],
351 queue_entry_ids=all_queue_entry_ids))
352
353
354 def _recover_processes(self):
mbligh90a549d2008-03-25 23:52:34 +0000355 orphans = self.find_autoservs(orphans_only=True)
mblighbb421852008-03-11 22:36:16 +0000356
357 # first, recover running queue entries
358 rows = _db.execute("""SELECT * FROM host_queue_entries
359 WHERE status = 'Running'""")
360 queue_entries = [HostQueueEntry(row=i) for i in rows]
361 requeue_entries = []
362 recovered_entry_ids = set()
363 for queue_entry in queue_entries:
364 run_monitor = PidfileRunMonitor(
365 queue_entry.results_dir())
366 pid, exit_code = run_monitor.get_pidfile_info()
367 if pid is None:
368 # autoserv apparently never got run, so requeue
369 requeue_entries.append(queue_entry)
370 continue
371 if queue_entry.id in recovered_entry_ids:
372 # synchronous job we've already recovered
373 continue
374 print 'Recovering queue entry %d (pid %d)' % (
375 queue_entry.id, pid)
376 job = queue_entry.job
377 if job.is_synchronous():
378 for entry in job.get_host_queue_entries():
379 assert entry.active
380 recovered_entry_ids.add(entry.id)
381 self.recover_queue_entry(queue_entry,
382 run_monitor)
383 orphans.pop(pid, None)
384
385 # and requeue other active queue entries
386 rows = _db.execute("""SELECT * FROM host_queue_entries
387 WHERE active AND NOT complete
388 AND status != 'Running'
389 AND status != 'Pending'
390 AND status != 'Abort'
391 AND status != 'Aborting'""")
392 queue_entries = [HostQueueEntry(row=i) for i in rows]
393 for queue_entry in queue_entries + requeue_entries:
394 print 'Requeuing running QE %d' % queue_entry.id
mbligh90a549d2008-03-25 23:52:34 +0000395 queue_entry.clear_results_dir(dont_delete_files=True)
mblighbb421852008-03-11 22:36:16 +0000396 queue_entry.requeue()
397
398
399 # now kill any remaining autoserv processes
400 for pid in orphans.keys():
401 print 'Killing orphan %d (%s)' % (pid, orphans[pid])
402 kill_autoserv(pid)
403
404 # recover aborting tasks
mbligh90a549d2008-03-25 23:52:34 +0000405 rebooting_host_ids = set()
mblighd5c95802008-03-05 00:33:46 +0000406 rows = _db.execute("""SELECT * FROM host_queue_entries
407 WHERE status='Abort' or status='Aborting'""")
mblighbb421852008-03-11 22:36:16 +0000408 queue_entries = [HostQueueEntry(row=i) for i in rows]
409 for queue_entry in queue_entries:
mbligh90a549d2008-03-25 23:52:34 +0000410 print 'Recovering aborting QE %d' % queue_entry.id
mblighbb421852008-03-11 22:36:16 +0000411 queue_host = queue_entry.get_host()
412 reboot_task = RebootTask(queue_host)
413 verify_task = VerifyTask(host = queue_host)
414 self.add_agent(Agent(tasks=[reboot_task,
415 verify_task],
416 queue_entry_ids=[queue_entry.id]))
417 queue_entry.set_status('Aborted')
418 # Secure the host from being picked up
419 queue_host.set_status('Rebooting')
mbligh90a549d2008-03-25 23:52:34 +0000420 rebooting_host_ids.add(queue_host.id)
mblighd5c95802008-03-05 00:33:46 +0000421
mblighbb421852008-03-11 22:36:16 +0000422 # reverify hosts that were in the middle of verify, repair or
423 # reboot
mbligh90a549d2008-03-25 23:52:34 +0000424 self._reverify_hosts_where("""(status = 'Repairing' OR
425 status = 'Verifying' OR
426 status = 'Rebooting')""",
427 exclude_ids=rebooting_host_ids)
428
429 # finally, recover "Running" hosts with no active queue entries,
430 # although this should never happen
431 message = ('Recovering running host %s - this probably '
432 'indicates a scheduler bug')
433 self._reverify_hosts_where("""status = 'Running' AND
434 id NOT IN (SELECT host_id
435 FROM host_queue_entries
436 WHERE active)""",
437 print_message=message)
438
439
440 def _reverify_hosts_where(self, where,
441 print_message='Reverifying host %s',
442 exclude_ids=set()):
443 rows = _db.execute('SELECT * FROM hosts WHERE locked = 0 AND ' +
444 where)
mblighbb421852008-03-11 22:36:16 +0000445 hosts = [Host(row=i) for i in rows]
446 for host in hosts:
mbligh90a549d2008-03-25 23:52:34 +0000447 if host.id in exclude_ids:
448 continue
449 if print_message is not None:
450 print print_message % host.hostname
451 verify_task = VerifyTask(host = host)
452 self.add_agent(Agent(tasks = [verify_task]))
mblighbb421852008-03-11 22:36:16 +0000453
454
455 def _recover_hosts(self):
mbligh90a549d2008-03-25 23:52:34 +0000456 # recover "Repair Failed" hosts
457 message = 'Reverifying dead host %s'
458 self._reverify_hosts_where("status = 'Repair Failed'",
459 print_message=message)
mbligh36768f02008-02-22 18:28:33 +0000460
461
462 def _find_more_work(self):
463 print "finding work"
464
465 num_started = 0
466 for host in idle_hosts():
467 tasks = host.next_queue_entries()
468 if tasks:
469 for next in tasks:
470 try:
471 agent = next.run(assigned_host=host)
472 if agent:
473 self.add_agent(agent)
474
475 num_started += 1
476 if num_started>=100:
477 return
478 break
479 except:
480 next.set_status('Failed')
481
482# if next.host:
483# next.host.set_status('Ready')
484
485 log_stacktrace("task_id = %d" % next.id)
486
487
mblighd5c95802008-03-05 00:33:46 +0000488 def _find_aborting(self):
489 num_aborted = 0
490 # Find jobs that are aborting
491 for entry in queue_entries_to_abort():
492 agents_to_abort = self.get_agents(entry)
493 entry_host = entry.get_host()
494 reboot_task = RebootTask(entry_host)
495 verify_task = VerifyTask(host = entry_host)
496 tasks = [reboot_task, verify_task]
497 if agents_to_abort:
498 abort_task = AbortTask(entry, agents_to_abort)
499 tasks.insert(0, abort_task)
500 else:
501 entry.set_status('Aborted')
502 # just to make sure this host does not get
503 # taken away
504 entry_host.set_status('Rebooting')
505 self.add_agent(Agent(tasks=tasks,
506 queue_entry_ids = [entry.id]))
507 num_aborted += 1
508 if num_aborted >= 50:
509 break
510
511
mbligh36768f02008-02-22 18:28:33 +0000512 def _handle_agents(self):
513 still_running = []
514 for agent in self._agents:
515 agent.tick()
516 if not agent.is_done():
517 still_running.append(agent)
518 else:
519 print "agent finished"
520 self._agents = still_running
521
522
523class RunMonitor(object):
524 def __init__(self, cmd, nice_level = None, log_file = None):
525 self.nice_level = nice_level
526 self.log_file = log_file
527 self.proc = self.run(cmd)
528
529 def run(self, cmd):
530 if self.nice_level:
531 nice_cmd = ['nice','-n', str(self.nice_level)]
532 nice_cmd.extend(cmd)
533 cmd = nice_cmd
534
535 out_file = None
536 if self.log_file:
537 try:
mblighbb421852008-03-11 22:36:16 +0000538 os.makedirs(os.path.dirname(self.log_file))
mbligh36768f02008-02-22 18:28:33 +0000539 out_file = open(self.log_file, 'a')
540 out_file.write("\n%s\n" % ('*'*80))
541 out_file.write("%s> %s\n" % (time.strftime("%X %x"), cmd))
542 out_file.write("%s\n" % ('*'*80))
543 except:
544 pass
545
546 if not out_file:
547 out_file = open('/dev/null', 'w')
548
549 in_devnull = open('/dev/null', 'r')
550 print "cmd = %s" % cmd
551 print "path = %s" % os.getcwd()
552
553 proc = subprocess.Popen(cmd, stdout=out_file,
554 stderr=subprocess.STDOUT, stdin=in_devnull)
555 out_file.close()
556 in_devnull.close()
557 return proc
558
559
mblighbb421852008-03-11 22:36:16 +0000560 def get_pid(self):
561 return self.proc.pid
562
563
mbligh36768f02008-02-22 18:28:33 +0000564 def kill(self):
mblighbb421852008-03-11 22:36:16 +0000565 kill_autoserv(self.get_pid(), self.exit_code)
566
mbligh36768f02008-02-22 18:28:33 +0000567
568 def exit_code(self):
569 return self.proc.poll()
570
571
mblighbb421852008-03-11 22:36:16 +0000572class PidfileException(Exception):
573 """\
574 Raised when there's some unexpected behavior with the pid file.
575 """
576
577
578class PidfileRunMonitor(RunMonitor):
579 def __init__(self, results_dir, cmd=None, nice_level=None,
580 log_file=None):
581 self.results_dir = os.path.abspath(results_dir)
582 self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
583 self.lost_process = False
mbligh90a549d2008-03-25 23:52:34 +0000584 self.start_time = time.time()
mblighbb421852008-03-11 22:36:16 +0000585 if cmd is None:
586 # we're reattaching to an existing pid, so don't call
587 # the superconstructor (we don't want to kick off a new
588 # process)
589 pass
590 else:
591 RunMonitor.__init__(self, cmd, nice_level, log_file)
592
593
594 def get_pid(self):
595 pid, exit_status = self.get_pidfile_info()
596 assert pid is not None
597 return pid
598
599
mbligh90a549d2008-03-25 23:52:34 +0000600 def _check_command_line(self, command_line, spacer=' ',
601 print_error=False):
602 results_dir_arg = spacer.join(('', '-r', self.results_dir, ''))
603 match = results_dir_arg in command_line
604 if print_error and not match:
605 print '%s not found in %s' % (repr(results_dir_arg),
606 repr(command_line))
607 return match
608
609
610 def _check_proc_fs(self, pid):
mblighbb421852008-03-11 22:36:16 +0000611 cmdline_path = os.path.join('/proc', str(pid), 'cmdline')
612 try:
613 cmdline_file = open(cmdline_path, 'r')
614 cmdline = cmdline_file.read().strip()
615 cmdline_file.close()
616 except IOError:
617 return False
618 # /proc/.../cmdline has \x00 separating args
mbligh90a549d2008-03-25 23:52:34 +0000619 return self._check_command_line(cmdline, spacer='\x00',
620 print_error=True)
mblighbb421852008-03-11 22:36:16 +0000621
622
623 def read_pidfile(self):
624 if not os.path.exists(self.pid_file):
625 return None, None
626 file_obj = open(self.pid_file, 'r')
627 lines = file_obj.readlines()
628 file_obj.close()
629 assert 1 <= len(lines) <= 2
630 try:
631 pid = int(lines[0])
632 exit_status = None
633 if len(lines) == 2:
634 exit_status = int(lines[1])
635 except ValueError, exc:
636 raise Exception('Corrupt pid file: ' + str(exc.args))
637
638 return pid, exit_status
639
640
mbligh90a549d2008-03-25 23:52:34 +0000641 def _find_autoserv_proc(self):
642 autoserv_procs = Dispatcher.find_autoservs()
643 for pid, args in autoserv_procs.iteritems():
644 if self._check_command_line(args):
645 return pid, args
646 return None, None
647
648
mblighbb421852008-03-11 22:36:16 +0000649 def get_pidfile_info(self):
650 """\
651 Returns:
652 None, None if autoserv has not yet run
653 pid, None if autoserv is running
654 pid, exit_status if autoserv has completed
655 """
656 if self.lost_process:
657 return self.pid, self.exit_status
658
659 pid, exit_status = self.read_pidfile()
660
mbligh90a549d2008-03-25 23:52:34 +0000661 if pid is None:
662 return self._handle_no_pid()
663
664 if exit_status is None:
665 # double check whether or not autoserv is running
666 proc_running = self._check_proc_fs(pid)
667 if proc_running:
668 return pid, exit_status
669
670 # pid but no process - maybe process *just* exited
mblighbb421852008-03-11 22:36:16 +0000671 pid, exit_status = self.read_pidfile()
mbligh90a549d2008-03-25 23:52:34 +0000672 if exit_status is None:
mblighbb421852008-03-11 22:36:16 +0000673 # autoserv exited without writing an exit code
674 # to the pidfile
675 error = ('autoserv died without writing exit '
676 'code')
677 message = error + '\nPid: %s\nPidfile: %s' % (
678 pid, self.pid_file)
679 print message
680 send_notify_email(error, message)
mbligh90a549d2008-03-25 23:52:34 +0000681 self.on_lost_process(pid)
mblighbb421852008-03-11 22:36:16 +0000682 return self.pid, self.exit_status
683
684 return pid, exit_status
685
686
mbligh90a549d2008-03-25 23:52:34 +0000687 def _handle_no_pid(self):
688 """\
689 Called when no pidfile is found or no pid is in the pidfile.
690 """
691 # is autoserv running?
692 pid, args = self._find_autoserv_proc()
693 if pid is None:
694 # no autoserv process running
695 message = 'No pid found at ' + self.pid_file
696 else:
697 message = ("Process %d (%s) hasn't written pidfile %s" %
698 (pid, args, self.pid_file))
699
700 print message
701 if time.time() - self.start_time > PIDFILE_TIMEOUT:
702 send_notify_email('Process has failed to write pidfile',
703 message)
704 if pid is not None:
705 kill_autoserv(pid)
706 else:
707 pid = 0
708 self.on_lost_process(pid)
709 return self.pid, self.exit_status
710
711 return None, None
712
713
714 def on_lost_process(self, pid):
715 """\
716 Called when autoserv has exited without writing an exit status,
717 or we've timed out waiting for autoserv to write a pid to the
718 pidfile. In either case, we just return failure and the caller
719 should signal some kind of warning.
720
721 pid is unimportant here, as it shouldn't be used by anyone.
722 """
723 self.lost_process = True
724 self.pid = pid
725 self.exit_status = 1
726
727
mblighbb421852008-03-11 22:36:16 +0000728 def exit_code(self):
729 pid, exit_code = self.get_pidfile_info()
mblighbb421852008-03-11 22:36:16 +0000730 return exit_code
731
732
mbligh36768f02008-02-22 18:28:33 +0000733class Agent(object):
mblighd5c95802008-03-05 00:33:46 +0000734 def __init__(self, tasks, queue_entry_ids=[]):
mbligh36768f02008-02-22 18:28:33 +0000735 self.active_task = None
736 self.queue = Queue.Queue(0)
737 self.dispatcher = None
mblighd5c95802008-03-05 00:33:46 +0000738 self.queue_entry_ids = queue_entry_ids
mbligh36768f02008-02-22 18:28:33 +0000739
740 for task in tasks:
741 self.add_task(task)
742
743
744 def add_task(self, task):
745 self.queue.put_nowait(task)
746 task.agent = self
747
748
749 def tick(self):
750 print "agent tick"
751 if self.active_task and not self.active_task.is_done():
752 self.active_task.poll()
753 else:
754 self._next_task();
755
756
757 def _next_task(self):
758 print "agent picking task"
759 if self.active_task:
760 assert self.active_task.is_done()
761
mblighe2586682008-02-29 22:45:46 +0000762 if not self.active_task.success:
763 self.on_task_failure()
764
mbligh36768f02008-02-22 18:28:33 +0000765 self.active_task = None
766 if not self.is_done():
767 self.active_task = self.queue.get_nowait()
768 if self.active_task:
769 self.active_task.start()
770
771
mblighe2586682008-02-29 22:45:46 +0000772 def on_task_failure(self):
mblighe2586682008-02-29 22:45:46 +0000773 self.queue = Queue.Queue(0)
774 for task in self.active_task.failure_tasks:
775 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +0000776
mblighe2586682008-02-29 22:45:46 +0000777
mbligh36768f02008-02-22 18:28:33 +0000778 def is_done(self):
779 return self.active_task == None and self.queue.empty()
780
781
782 def start(self):
783 assert self.dispatcher
784
785 self._next_task()
786
mblighd5c95802008-03-05 00:33:46 +0000787
mbligh36768f02008-02-22 18:28:33 +0000788class AgentTask(object):
mbligh16c722d2008-03-05 00:58:44 +0000789 def __init__(self, cmd, failure_tasks = []):
mbligh36768f02008-02-22 18:28:33 +0000790 self.done = False
791 self.failure_tasks = failure_tasks
792 self.started = False
793 self.cmd = cmd
mblighd5c95802008-03-05 00:33:46 +0000794 self.task = None
mbligh36768f02008-02-22 18:28:33 +0000795 self.agent = None
mblighd5c95802008-03-05 00:33:46 +0000796 self.monitor = None
mbligh36768f02008-02-22 18:28:33 +0000797
798
799 def poll(self):
800 print "poll"
mblighd5c95802008-03-05 00:33:46 +0000801 if self.monitor:
mbligh36768f02008-02-22 18:28:33 +0000802 self.tick(self.monitor.exit_code())
803 else:
804 self.finished(False)
805
806
807 def tick(self, exit_code):
808 if exit_code==None:
809 return
810# print "exit_code was %d" % exit_code
811 if exit_code == 0:
812 success = True
813 else:
814 success = False
815
816 self.finished(success)
817
818
819 def is_done(self):
820 return self.done
821
822
823 def finished(self, success):
824 self.done = True
825 self.success = success
826 self.epilog()
827
828
829 def prolog(self):
830 pass
831
832
833 def epilog(self):
834 pass
835
836
837 def start(self):
838 assert self.agent
839
840 if not self.started:
841 self.prolog()
842 self.run()
843
844 self.started = True
845
846
847 def abort(self):
mblighd5c95802008-03-05 00:33:46 +0000848 if self.monitor:
849 self.monitor.kill()
850 self.done = True
mbligh36768f02008-02-22 18:28:33 +0000851
852
853 def run(self):
854 if self.cmd:
855 print "agent starting monitor"
mbligh36768f02008-02-22 18:28:33 +0000856 log_file = None
857 if hasattr(self, 'host'):
mblighbb421852008-03-11 22:36:16 +0000858 log_file = os.path.join(RESULTS_DIR, 'hosts',
859 self.host.hostname)
860 self.monitor = RunMonitor(
861 self.cmd, nice_level = AUTOSERV_NICE_LEVEL,
862 log_file = log_file)
mbligh36768f02008-02-22 18:28:33 +0000863
864
865class RepairTask(AgentTask):
mbligh16c722d2008-03-05 00:58:44 +0000866 def __init__(self, host, fail_queue_entry=None):
867 """\
868 fail_queue_entry: queue entry to mark failed if this repair
869 fails.
870 """
mbligh48c10a52008-02-29 22:46:38 +0000871 cmd = [_autoserv_path , '-R', '-m', host.hostname]
mbligh36768f02008-02-22 18:28:33 +0000872 self.host = host
mbligh16c722d2008-03-05 00:58:44 +0000873 self.fail_queue_entry = fail_queue_entry
874 AgentTask.__init__(self, cmd)
mblighe2586682008-02-29 22:45:46 +0000875
mbligh36768f02008-02-22 18:28:33 +0000876
877 def prolog(self):
878 print "repair_task starting"
879 self.host.set_status('Repairing')
880
881
882 def epilog(self):
883 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000884 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +0000885 else:
mbligh16c722d2008-03-05 00:58:44 +0000886 self.host.set_status('Repair Failed')
887 if self.fail_queue_entry:
888 self.fail_queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +0000889
890
891class VerifyTask(AgentTask):
892 def __init__(self, queue_entry=None, host=None):
893 assert bool(queue_entry) != bool(host)
894
895 self.host = host or queue_entry.host
896 self.queue_entry = queue_entry
897
898 self.temp_results_dir = tempfile.mkdtemp(suffix='.verify')
mbligh48c10a52008-02-29 22:46:38 +0000899 cmd = [_autoserv_path,'-v','-m',self.host.hostname,
mbligh36768f02008-02-22 18:28:33 +0000900 '-r', self.temp_results_dir]
901
mbligh16c722d2008-03-05 00:58:44 +0000902 fail_queue_entry = None
903 if queue_entry and not queue_entry.meta_host:
904 fail_queue_entry = queue_entry
905 failure_tasks = [RepairTask(self.host, fail_queue_entry)]
mblighe2586682008-02-29 22:45:46 +0000906
mblighdffd6372008-02-29 22:47:33 +0000907 AgentTask.__init__(self, cmd, failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +0000908
909
mbligh36768f02008-02-22 18:28:33 +0000910 def prolog(self):
911 print "starting verify on %s" % (self.host.hostname)
912 if self.queue_entry:
913 self.queue_entry.set_status('Verifying')
mblighdffd6372008-02-29 22:47:33 +0000914 self.queue_entry.clear_results_dir(
915 self.queue_entry.verify_results_dir())
mbligh36768f02008-02-22 18:28:33 +0000916 self.host.set_status('Verifying')
917
918
919 def epilog(self):
920 if self.queue_entry and (self.success or
921 not self.queue_entry.meta_host):
922 self.move_results()
mblighe2586682008-02-29 22:45:46 +0000923 shutil.rmtree(self.temp_results_dir)
mbligh36768f02008-02-22 18:28:33 +0000924
925 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000926 self.host.set_status('Ready')
927 elif self.queue_entry:
mblighdffd6372008-02-29 22:47:33 +0000928 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +0000929
930
931 def move_results(self):
932 assert self.queue_entry is not None
mblighe2586682008-02-29 22:45:46 +0000933 target_dir = self.queue_entry.verify_results_dir()
mbligh36768f02008-02-22 18:28:33 +0000934 if not os.path.exists(target_dir):
935 os.makedirs(target_dir)
936 files = os.listdir(self.temp_results_dir)
937 for filename in files:
mblighbb421852008-03-11 22:36:16 +0000938 if filename == AUTOSERV_PID_FILE:
939 continue
mblighe2586682008-02-29 22:45:46 +0000940 self.force_move(os.path.join(self.temp_results_dir,
941 filename),
942 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +0000943
944
mblighe2586682008-02-29 22:45:46 +0000945 @staticmethod
946 def force_move(source, dest):
947 """\
948 Replacement for shutil.move() that will delete the destination
949 if it exists, even if it's a directory.
950 """
951 if os.path.exists(dest):
952 print ('Warning: removing existing destination file ' +
953 dest)
954 remove_file_or_dir(dest)
955 shutil.move(source, dest)
956
957
mblighdffd6372008-02-29 22:47:33 +0000958class VerifySynchronousTask(VerifyTask):
959 def __init__(self, queue_entry):
960 VerifyTask.__init__(self, queue_entry = queue_entry)
961
962
mbligh16c722d2008-03-05 00:58:44 +0000963 def epilog(self):
964 VerifyTask.epilog(self)
965 if self.success:
966 if self.queue_entry.job.num_complete() > 0:
967 # some other entry failed verify, and we've
968 # already been marked as stopped
969 return
mblighdffd6372008-02-29 22:47:33 +0000970
mbligh16c722d2008-03-05 00:58:44 +0000971 self.queue_entry.set_status('Pending')
972 job = self.queue_entry.job
973 if job.is_ready():
974 agent = job.run(self.queue_entry)
975 self.agent.dispatcher.add_agent(agent)
mblighe2586682008-02-29 22:45:46 +0000976
mbligh36768f02008-02-22 18:28:33 +0000977class QueueTask(AgentTask):
978 def __init__(self, job, queue_entries, cmd):
979 AgentTask.__init__(self, cmd)
980 self.job = job
981 self.queue_entries = queue_entries
982
983
mblighbb421852008-03-11 22:36:16 +0000984 @staticmethod
985 def _write_keyval(results_dir, field, value):
986 key_path = os.path.join(results_dir, 'keyval')
mbligh36768f02008-02-22 18:28:33 +0000987 keyval_file = open(key_path, 'a')
988 print >> keyval_file, '%s=%d' % (field, value)
989 keyval_file.close()
990
991
mblighbb421852008-03-11 22:36:16 +0000992 def results_dir(self):
993 return self.queue_entries[0].results_dir()
994
995
996 def run(self):
997 """\
998 Override AgentTask.run() so we can use a PidfileRunMonitor.
999 """
1000 self.monitor = PidfileRunMonitor(self.results_dir(),
1001 cmd=self.cmd,
1002 nice_level=AUTOSERV_NICE_LEVEL)
1003
1004
mbligh36768f02008-02-22 18:28:33 +00001005 def prolog(self):
mblighdbdac6c2008-03-05 15:49:58 +00001006 # write the parser commands into the results directories
1007 if self.job.is_synchronous() or self.job.num_machines()==1:
1008 results_dir = self.job.results_dir()
1009 cmdfile = os.path.join(results_dir, '.parse.cmd')
mblighb03ba642008-03-13 17:37:17 +00001010 cmd = generate_parse_command(results_dir, '-n')
mblighdbdac6c2008-03-05 15:49:58 +00001011 print >> open(cmdfile, 'w'), cmd
1012 else:
1013 for queue_entry in self.queue_entries:
1014 results_dir = queue_entry.results_dir()
1015 cmdfile = os.path.join(results_dir,
1016 '.parse.cmd')
1017 cmd = generate_parse_command(results_dir,
mblighb03ba642008-03-13 17:37:17 +00001018 '-l 2 -n')
mblighdbdac6c2008-03-05 15:49:58 +00001019 print >> open(cmdfile, 'w'), cmd
1020
mblighe2586682008-02-29 22:45:46 +00001021 # write some job timestamps into the job keyval file
1022 queued = time.mktime(self.job.created_on.timetuple())
1023 started = time.time()
mblighbb421852008-03-11 22:36:16 +00001024 self._write_keyval(self.results_dir(), "job_queued", queued)
1025 self._write_keyval(self.results_dir(), "job_started", started)
mbligh36768f02008-02-22 18:28:33 +00001026 for queue_entry in self.queue_entries:
1027 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
1028 queue_entry.set_status('Running')
1029 queue_entry.host.set_status('Running')
mblighe2586682008-02-29 22:45:46 +00001030 if (not self.job.is_synchronous() and
1031 self.job.num_machines() > 1):
1032 assert len(self.queue_entries) == 1
1033 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001034
1035
1036 def epilog(self):
1037 if self.success:
1038 status = 'Completed'
1039 else:
1040 status = 'Failed'
1041
mblighe2586682008-02-29 22:45:46 +00001042 # write another timestamp into the job keyval file
1043 finished = time.time()
mblighbb421852008-03-11 22:36:16 +00001044 self._write_keyval(self.results_dir(), "job_finished", finished)
mbligh36768f02008-02-22 18:28:33 +00001045 for queue_entry in self.queue_entries:
1046 queue_entry.set_status(status)
1047 queue_entry.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001048
1049 if self.job.is_synchronous() or self.job.num_machines()==1:
1050 if self.job.is_finished():
1051 parse_results(self.job.results_dir())
1052 else:
1053 for queue_entry in self.queue_entries:
mblighbb421852008-03-11 22:36:16 +00001054 parse_results(queue_entry.results_dir(),
1055 flags='-l 2')
1056
mbligh36768f02008-02-22 18:28:33 +00001057 print "queue_task finished with %s/%s" % (status, self.success)
1058
1059
mblighbb421852008-03-11 22:36:16 +00001060class RecoveryQueueTask(QueueTask):
1061 def __init__(self, job, queue_entries, run_monitor):
1062 QueueTask.__init__(self, job, queue_entries, cmd=None)
1063 self.run_monitor = run_monitor
1064
1065
1066 def run(self):
1067 self.monitor = self.run_monitor
1068
1069
1070 def prolog(self):
1071 # recovering an existing process - don't do prolog
1072 pass
1073
1074
mbligh36768f02008-02-22 18:28:33 +00001075class RebootTask(AgentTask):
mblighd5c95802008-03-05 00:33:46 +00001076 def __init__(self, host):
1077 global _autoserv_path
1078
1079 # Current implementation of autoserv requires control file
1080 # to be passed on reboot action request. TODO: remove when no
1081 # longer appropriate.
1082 self.cmd = [_autoserv_path, '-b', '-m', host.hostname,
1083 '/dev/null']
mbligh36768f02008-02-22 18:28:33 +00001084 self.host = host
mblighd5c95802008-03-05 00:33:46 +00001085 AgentTask.__init__(self, self.cmd,
mbligh16c722d2008-03-05 00:58:44 +00001086 failure_tasks=[RepairTask(host)])
1087
mblighd5c95802008-03-05 00:33:46 +00001088
1089 def prolog(self):
1090 print "starting reboot task for host: %s" % self.host.hostname
1091 self.host.set_status("Rebooting")
1092
mblighd5c95802008-03-05 00:33:46 +00001093
1094class AbortTask(AgentTask):
1095 def __init__(self, queue_entry, agents_to_abort):
1096 self.queue_entry = queue_entry
1097 self.agents_to_abort = agents_to_abort
1098 for agent in agents_to_abort:
1099 agent.dispatcher.remove_agent(agent)
1100 AgentTask.__init__(self, '')
mbligh36768f02008-02-22 18:28:33 +00001101
1102
mblighd5c95802008-03-05 00:33:46 +00001103 def prolog(self):
1104 print "starting abort on host %s, job %s" % (
1105 self.queue_entry.host_id, self.queue_entry.job_id)
1106 self.queue_entry.set_status('Aborting')
1107
mbligh36768f02008-02-22 18:28:33 +00001108
mblighd5c95802008-03-05 00:33:46 +00001109 def epilog(self):
1110 self.queue_entry.set_status('Aborted')
1111 self.success = True
mbligh36768f02008-02-22 18:28:33 +00001112
1113 def run(self):
mblighd5c95802008-03-05 00:33:46 +00001114 for agent in self.agents_to_abort:
1115 if (agent.active_task):
1116 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001117
1118
1119class DBObject(object):
mblighe2586682008-02-29 22:45:46 +00001120 def __init__(self, fields, id=None, row=None, new_record=False):
1121 assert (bool(id) != bool(row)) and fields
mbligh36768f02008-02-22 18:28:33 +00001122
mblighe2586682008-02-29 22:45:46 +00001123 self.__table = self._get_table()
mbligh36768f02008-02-22 18:28:33 +00001124 self.__fields = fields
1125
1126 self.__new_record = new_record
1127
1128 if row is None:
1129 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
mbligh6f8bab42008-02-29 22:45:14 +00001130 rows = _db.execute(sql, (id,))
1131 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001132 raise "row not found (table=%s, id=%s)" % \
1133 (self.__table, id)
mbligh6f8bab42008-02-29 22:45:14 +00001134 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001135
mblighe2586682008-02-29 22:45:46 +00001136 assert len(row)==len(fields), (
1137 "table = %s, row = %s/%d, fields = %s/%d" % (
1138 self.__table, row, len(row), fields, len(fields)))
mbligh36768f02008-02-22 18:28:33 +00001139
1140 self.__valid_fields = {}
1141 for i,value in enumerate(row):
1142 self.__dict__[fields[i]] = value
1143 self.__valid_fields[fields[i]] = True
1144
1145 del self.__valid_fields['id']
1146
mblighe2586682008-02-29 22:45:46 +00001147
1148 @classmethod
1149 def _get_table(cls):
1150 raise NotImplementedError('Subclasses must override this')
1151
1152
mbligh36768f02008-02-22 18:28:33 +00001153 def count(self, where, table = None):
1154 if not table:
1155 table = self.__table
mbligh4314a712008-02-29 22:44:30 +00001156
mbligh6f8bab42008-02-29 22:45:14 +00001157 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001158 SELECT count(*) FROM %s
1159 WHERE %s
1160 """ % (table, where))
mbligh36768f02008-02-22 18:28:33 +00001161
mbligh6f8bab42008-02-29 22:45:14 +00001162 assert len(rows) == 1
1163
1164 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001165
1166
1167 def num_cols(self):
1168 return len(self.__fields)
1169
1170
1171 def update_field(self, field, value):
1172 assert self.__valid_fields[field]
1173
1174 if self.__dict__[field] == value:
1175 return
1176
1177 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
1178 (self.__table, field)
mbligh6f8bab42008-02-29 22:45:14 +00001179 _db.execute(query, (value, self.id))
mbligh36768f02008-02-22 18:28:33 +00001180
1181 self.__dict__[field] = value
1182
1183
1184 def save(self):
1185 if self.__new_record:
1186 keys = self.__fields[1:] # avoid id
1187 columns = ','.join([str(key) for key in keys])
1188 values = ['"%s"' % self.__dict__[key] for key in keys]
1189 values = ','.join(values)
1190 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1191 (self.__table, columns, values)
mbligh6f8bab42008-02-29 22:45:14 +00001192 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001193
1194
mblighe2586682008-02-29 22:45:46 +00001195 def delete(self):
1196 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1197 _db.execute(query, (self.id,))
1198
1199
1200 @classmethod
1201 def fetch(cls, where):
1202 rows = _db.execute(
1203 'SELECT * FROM %s WHERE %s' % (cls._get_table(), where))
1204 for row in rows:
1205 yield cls(row=row)
1206
mbligh36768f02008-02-22 18:28:33 +00001207
1208class IneligibleHostQueue(DBObject):
1209 def __init__(self, id=None, row=None, new_record=None):
1210 fields = ['id', 'job_id', 'host_id']
mblighe2586682008-02-29 22:45:46 +00001211 DBObject.__init__(self, fields, id=id, row=row,
1212 new_record=new_record)
1213
1214
1215 @classmethod
1216 def _get_table(cls):
1217 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001218
1219
1220class Host(DBObject):
1221 def __init__(self, id=None, row=None):
1222 fields = ['id', 'hostname', 'locked', 'synch_id','status']
mblighe2586682008-02-29 22:45:46 +00001223 DBObject.__init__(self, fields, id=id, row=row)
1224
1225
1226 @classmethod
1227 def _get_table(cls):
1228 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001229
1230
1231 def current_task(self):
mbligh6f8bab42008-02-29 22:45:14 +00001232 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001233 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1234 """, (self.id,))
1235
mbligh6f8bab42008-02-29 22:45:14 +00001236 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001237 return None
1238 else:
mbligh6f8bab42008-02-29 22:45:14 +00001239 assert len(rows) == 1
1240 results = rows[0];
mbligh36768f02008-02-22 18:28:33 +00001241# print "current = %s" % results
1242 return HostQueueEntry(row=results)
1243
1244
1245 def next_queue_entries(self):
1246 if self.locked:
1247 print "%s locked, not queuing" % self.hostname
1248 return None
1249# print "%s/%s looking for work" % (self.hostname, self.platform_id)
mbligh6f8bab42008-02-29 22:45:14 +00001250 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001251 SELECT * FROM host_queue_entries
1252 WHERE ((host_id=%s) OR (meta_host IS NOT null AND
1253 (meta_host IN (
1254 SELECT label_id FROM hosts_labels WHERE host_id=%s
1255 )
1256 )
1257 AND job_id NOT IN (
1258 SELECT job_id FROM ineligible_host_queues
1259 WHERE host_id=%s
1260 )))
1261 AND NOT complete AND NOT active
1262 ORDER BY priority DESC, meta_host, id
1263 LIMIT 1
1264 """, (self.id,self.id, self.id))
1265
mbligh6f8bab42008-02-29 22:45:14 +00001266 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001267 return None
1268 else:
mbligh6f8bab42008-02-29 22:45:14 +00001269 return [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001270
1271 def yield_work(self):
1272 print "%s yielding work" % self.hostname
1273 if self.current_task():
1274 self.current_task().requeue()
1275
1276 def set_status(self,status):
mblighbb421852008-03-11 22:36:16 +00001277 print '%s -> %s' % (self.hostname, status)
mbligh36768f02008-02-22 18:28:33 +00001278 self.update_field('status',status)
1279
1280
1281class HostQueueEntry(DBObject):
1282 def __init__(self, id=None, row=None):
1283 assert id or row
1284 fields = ['id', 'job_id', 'host_id', 'priority', 'status',
1285 'meta_host', 'active', 'complete']
mblighe2586682008-02-29 22:45:46 +00001286 DBObject.__init__(self, fields, id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001287
1288 self.job = Job(self.job_id)
1289
1290 if self.host_id:
1291 self.host = Host(self.host_id)
1292 else:
1293 self.host = None
1294
1295 self.queue_log_path = os.path.join(self.job.results_dir(),
1296 'queue.log.' + str(self.id))
1297
1298
mblighe2586682008-02-29 22:45:46 +00001299 @classmethod
1300 def _get_table(cls):
1301 return 'host_queue_entries'
1302
1303
mbligh36768f02008-02-22 18:28:33 +00001304 def set_host(self, host):
1305 if host:
1306 self.queue_log_record('Assigning host ' + host.hostname)
1307 self.update_field('host_id', host.id)
1308 self.update_field('active', True)
mblighe2586682008-02-29 22:45:46 +00001309 self.block_host(host.id)
mbligh36768f02008-02-22 18:28:33 +00001310 else:
1311 self.queue_log_record('Releasing host')
mblighe2586682008-02-29 22:45:46 +00001312 self.unblock_host(self.host.id)
mbligh36768f02008-02-22 18:28:33 +00001313 self.update_field('host_id', None)
1314
1315 self.host = host
1316
1317
1318 def get_host(self):
mblighe2586682008-02-29 22:45:46 +00001319 return self.host
mbligh36768f02008-02-22 18:28:33 +00001320
1321
1322 def queue_log_record(self, log_line):
mblighe2586682008-02-29 22:45:46 +00001323 now = str(datetime.datetime.now())
mbligh36768f02008-02-22 18:28:33 +00001324 queue_log = open(self.queue_log_path, 'a', 0)
mblighe2586682008-02-29 22:45:46 +00001325 queue_log.write(now + ' ' + log_line + '\n')
mbligh36768f02008-02-22 18:28:33 +00001326 queue_log.close()
1327
1328
mblighe2586682008-02-29 22:45:46 +00001329 def block_host(self, host_id):
1330 print "creating block %s/%s" % (self.job.id, host_id)
1331 row = [0, self.job.id, host_id]
1332 block = IneligibleHostQueue(row=row, new_record=True)
1333 block.save()
1334
1335
1336 def unblock_host(self, host_id):
1337 print "removing block %s/%s" % (self.job.id, host_id)
1338 blocks = list(IneligibleHostQueue.fetch(
1339 'job_id=%d and host_id=%d' % (self.job.id, host_id)))
1340 assert len(blocks) == 1
1341 blocks[0].delete()
1342
1343
mbligh36768f02008-02-22 18:28:33 +00001344 def results_dir(self):
mblighe2586682008-02-29 22:45:46 +00001345 if self.job.is_synchronous() or self.job.num_machines() == 1:
1346 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001347 else:
1348 assert self.host
mblighe2586682008-02-29 22:45:46 +00001349 return os.path.join(self.job.job_dir,
1350 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +00001351
mblighe2586682008-02-29 22:45:46 +00001352
1353 def verify_results_dir(self):
1354 if self.job.is_synchronous() or self.job.num_machines() > 1:
1355 assert self.host
1356 return os.path.join(self.job.job_dir,
1357 self.host.hostname)
1358 else:
1359 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001360
1361
1362 def set_status(self, status):
1363 self.update_field('status', status)
1364 if self.host:
1365 hostname = self.host.hostname
1366 else:
1367 hostname = 'no host'
1368 print "%s/%d status -> %s" % (hostname, self.id, self.status)
1369 if status in ['Queued']:
1370 self.update_field('complete', False)
1371 self.update_field('active', False)
1372
mblighd5c95802008-03-05 00:33:46 +00001373 if status in ['Pending', 'Running', 'Verifying', 'Starting',
1374 'Abort', 'Aborting']:
mbligh36768f02008-02-22 18:28:33 +00001375 self.update_field('complete', False)
1376 self.update_field('active', True)
1377
mblighd5c95802008-03-05 00:33:46 +00001378 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
mbligh36768f02008-02-22 18:28:33 +00001379 self.update_field('complete', True)
1380 self.update_field('active', False)
1381
1382
1383 def run(self,assigned_host=None):
1384 if self.meta_host:
1385 assert assigned_host
mblighe2586682008-02-29 22:45:46 +00001386 # ensure results dir exists for the queue log
mbligh36768f02008-02-22 18:28:33 +00001387 self.job.create_results_dir()
1388 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001389
mbligh36768f02008-02-22 18:28:33 +00001390 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1391 self.meta_host, self.host.hostname, self.status)
1392
1393 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001394
mbligh36768f02008-02-22 18:28:33 +00001395 def requeue(self):
1396 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001397
mbligh36768f02008-02-22 18:28:33 +00001398 if self.meta_host:
1399 self.set_host(None)
1400
1401
mblighe2586682008-02-29 22:45:46 +00001402 def handle_host_failure(self):
1403 """\
1404 Called when this queue entry's host has failed verification and
1405 repair.
1406 """
mblighdffd6372008-02-29 22:47:33 +00001407 assert not self.meta_host
1408 self.set_status('Failed')
1409 if self.job.is_synchronous():
1410 self.job.stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00001411
1412
mbligh90a549d2008-03-25 23:52:34 +00001413 def clear_results_dir(self, results_dir=None, dont_delete_files=False):
mblighe2586682008-02-29 22:45:46 +00001414 results_dir = results_dir or self.results_dir()
1415 if not os.path.exists(results_dir):
1416 return
mbligh90a549d2008-03-25 23:52:34 +00001417 if dont_delete_files:
1418 temp_dir = tempfile.mkdtemp(suffix='.clear_results')
1419 print 'Moving results from %s to %s' % (results_dir,
1420 temp_dir)
mblighe2586682008-02-29 22:45:46 +00001421 for filename in os.listdir(results_dir):
mblighe2586682008-02-29 22:45:46 +00001422 path = os.path.join(results_dir, filename)
mbligh90a549d2008-03-25 23:52:34 +00001423 if dont_delete_files:
1424 shutil.move(path,
1425 os.path.join(temp_dir, filename))
1426 else:
1427 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00001428
1429
1430class Job(DBObject):
1431 def __init__(self, id=None, row=None):
1432 assert id or row
mblighe2586682008-02-29 22:45:46 +00001433 DBObject.__init__(self,
1434 ['id','owner','name','priority',
1435 'control_file','control_type','created_on',
1436 'synch_type', 'synch_count','synchronizing'],
1437 id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001438
mblighe2586682008-02-29 22:45:46 +00001439 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
1440 self.owner))
1441
1442
1443 @classmethod
1444 def _get_table(cls):
1445 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001446
1447
1448 def is_server_job(self):
1449 return self.control_type != 2
1450
1451
1452 def get_host_queue_entries(self):
mbligh6f8bab42008-02-29 22:45:14 +00001453 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001454 SELECT * FROM host_queue_entries
1455 WHERE job_id= %s
1456 """, (self.id,))
mbligh6f8bab42008-02-29 22:45:14 +00001457 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001458
1459 assert len(entries)>0
1460
1461 return entries
1462
1463
1464 def set_status(self, status, update_queues=False):
1465 self.update_field('status',status)
1466
1467 if update_queues:
1468 for queue_entry in self.get_host_queue_entries():
1469 queue_entry.set_status(status)
1470
1471
1472 def is_synchronous(self):
1473 return self.synch_type == 2
1474
1475
1476 def is_ready(self):
1477 if not self.is_synchronous():
1478 return True
1479 sql = "job_id=%s AND status='Pending'" % self.id
1480 count = self.count(sql, table='host_queue_entries')
1481 return (count == self.synch_count)
1482
1483
1484 def ready_to_synchronize(self):
1485 # heuristic
1486 queue_entries = self.get_host_queue_entries()
1487 count = 0
1488 for queue_entry in queue_entries:
1489 if queue_entry.status == 'Pending':
1490 count += 1
1491
1492 return (count/self.synch_count >= 0.5)
1493
1494
1495 def start_synchronizing(self):
1496 self.update_field('synchronizing', True)
1497
1498
1499 def results_dir(self):
1500 return self.job_dir
1501
1502 def num_machines(self, clause = None):
1503 sql = "job_id=%s" % self.id
1504 if clause:
1505 sql += " AND (%s)" % clause
1506 return self.count(sql, table='host_queue_entries')
1507
1508
1509 def num_queued(self):
1510 return self.num_machines('not complete')
1511
1512
1513 def num_active(self):
1514 return self.num_machines('active')
1515
1516
1517 def num_complete(self):
1518 return self.num_machines('complete')
1519
1520
1521 def is_finished(self):
1522 left = self.num_queued()
1523 print "%s: %s machines left" % (self.name, left)
1524 return left==0
1525
1526 def stop_synchronizing(self):
1527 self.update_field('synchronizing', False)
1528 self.set_status('Queued', update_queues = False)
1529
1530
mblighe2586682008-02-29 22:45:46 +00001531 def stop_all_entries(self):
1532 for child_entry in self.get_host_queue_entries():
1533 if not child_entry.complete:
1534 child_entry.set_status('Stopped')
1535
1536
1537 def write_to_machines_file(self, queue_entry):
1538 hostname = queue_entry.get_host().hostname
1539 print "writing %s to job %s machines file" % (hostname, self.id)
1540 file_path = os.path.join(self.job_dir, '.machines')
1541 mf = open(file_path, 'a')
1542 mf.write("%s\n" % queue_entry.get_host().hostname)
1543 mf.close()
mbligh36768f02008-02-22 18:28:33 +00001544
1545
1546 def create_results_dir(self, queue_entry=None):
1547 print "create: active: %s complete %s" % (self.num_active(),
1548 self.num_complete())
1549
1550 if not os.path.exists(self.job_dir):
1551 os.makedirs(self.job_dir)
1552
1553 if queue_entry:
1554 return queue_entry.results_dir()
1555 return self.job_dir
1556
1557
1558 def run(self, queue_entry):
1559 results_dir = self.create_results_dir(queue_entry)
1560
1561 if self.is_synchronous():
1562 if not self.is_ready():
mblighd5c95802008-03-05 00:33:46 +00001563 return Agent([VerifySynchronousTask(
1564 queue_entry = queue_entry)],
1565 [queue_entry.id])
mbligh36768f02008-02-22 18:28:33 +00001566
1567 queue_entry.set_status('Starting')
1568
1569 ctrl = open(os.tmpnam(), 'w')
1570 if self.control_file:
1571 ctrl.write(self.control_file)
1572 else:
1573 ctrl.write("")
1574 ctrl.flush()
1575
1576 if self.is_synchronous():
mbligh36768f02008-02-22 18:28:33 +00001577 queue_entries = self.get_host_queue_entries()
1578 else:
1579 assert queue_entry
mbligh36768f02008-02-22 18:28:33 +00001580 queue_entries = [queue_entry]
mblighe2586682008-02-29 22:45:46 +00001581 hostnames = ','.join([entry.get_host().hostname
1582 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00001583
mblighbb421852008-03-11 22:36:16 +00001584 params = [_autoserv_path, '-p', '-n',
1585 '-r', os.path.abspath(results_dir),
1586 '-b', '-u', self.owner, '-l', self.name,
1587 '-m', hostnames, ctrl.name]
mbligh36768f02008-02-22 18:28:33 +00001588
1589 if not self.is_server_job():
1590 params.append('-c')
1591
1592 tasks = []
1593 if not self.is_synchronous():
1594 tasks.append(VerifyTask(queue_entry))
mblighe2586682008-02-29 22:45:46 +00001595
1596 tasks.append(QueueTask(job = self,
1597 queue_entries = queue_entries,
1598 cmd = params))
mbligh36768f02008-02-22 18:28:33 +00001599
mblighd5c95802008-03-05 00:33:46 +00001600 ids = []
1601 for entry in queue_entries:
1602 ids.append(entry.id)
1603
1604 agent = Agent(tasks, ids)
mbligh36768f02008-02-22 18:28:33 +00001605
1606 return agent
1607
1608
1609if __name__ == '__main__':
1610 main()