blob: f6887ad0da967ec1862a802fad18c4b024eb5c49 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
mblighcadb3532008-04-15 17:46:26 +00009import optparse, signal, smtplib, socket, datetime, stat, pwd, errno
mblighb090f142008-02-27 21:33:46 +000010from common import global_config
11
mbligh36768f02008-02-22 18:28:33 +000012RESULTS_DIR = '.'
13AUTOSERV_NICE_LEVEL = 10
14
15AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
16
17if os.environ.has_key('AUTOTEST_DIR'):
18 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
19AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
20AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
21
22if AUTOTEST_SERVER_DIR not in sys.path:
23 sys.path.insert(0, AUTOTEST_SERVER_DIR)
24
mblighbb421852008-03-11 22:36:16 +000025AUTOSERV_PID_FILE = '.autoserv_execute'
mbligh90a549d2008-03-25 23:52:34 +000026# how long to wait for autoserv to write a pidfile
27PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000028
mbligh6f8bab42008-02-29 22:45:14 +000029_db = None
mbligh36768f02008-02-22 18:28:33 +000030_shutdown = False
31_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000032_autoserv_path = 'autoserv'
33_testing_mode = False
showardec113162008-05-08 00:52:49 +000034_global_config_section = 'SCHEDULER'
mbligh36768f02008-02-22 18:28:33 +000035
36
37def main():
38 usage = 'usage: %prog [options] results_dir'
39
40 parser = optparse.OptionParser(usage)
mblighbb421852008-03-11 22:36:16 +000041 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
mbligh36768f02008-02-22 18:28:33 +000042 action='store_true')
43 parser.add_option('--logfile', help='Set a log file that all stdout ' +
44 'should be redirected to. Stderr will go to this ' +
45 'file + ".err"')
mbligh4314a712008-02-29 22:44:30 +000046 parser.add_option('--test', help='Indicate that scheduler is under ' +
47 'test and should use dummy autoserv and no parsing',
48 action='store_true')
mbligh36768f02008-02-22 18:28:33 +000049 (options, args) = parser.parse_args()
50 if len(args) != 1:
51 parser.print_usage()
52 return
53
54 global RESULTS_DIR
55 RESULTS_DIR = args[0]
56
mblighe44a46d2008-04-30 17:47:34 +000057 # read in notify_email from global_config
58 c = global_config.global_config
mbligh36768f02008-02-22 18:28:33 +000059 global _notify_email
showardec113162008-05-08 00:52:49 +000060 val = c.get_config_value(_global_config_section, "notify_email")
mblighe44a46d2008-04-30 17:47:34 +000061 if val != "":
62 _notify_email = val
mbligh4314a712008-02-29 22:44:30 +000063
64 if options.test:
65 global _autoserv_path
66 _autoserv_path = 'autoserv_dummy'
67 global _testing_mode
68 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000069
70 init(options.logfile)
mblighbb421852008-03-11 22:36:16 +000071 dispatcher = Dispatcher()
72 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
mbligh36768f02008-02-22 18:28:33 +000073
74 try:
75 while not _shutdown:
76 dispatcher.tick()
77 time.sleep(20)
mbligh36768f02008-02-22 18:28:33 +000078 except:
79 log_stacktrace("Uncaught exception; terminating monitor_db")
80
showard57881ee2008-05-22 02:57:39 +000081 email_manager.send_queued_emails()
mbligh6f8bab42008-02-29 22:45:14 +000082 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +000083
84
85def handle_sigint(signum, frame):
86 global _shutdown
87 _shutdown = True
88 print "Shutdown request received."
89
90
91def init(logfile):
92 if logfile:
93 enable_logging(logfile)
94 print "%s> dispatcher starting" % time.strftime("%X %x")
95 print "My PID is %d" % os.getpid()
96
97 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
mbligh6f8bab42008-02-29 22:45:14 +000098 global _db
99 _db = DatabaseConn()
showard30eed1f2008-05-27 22:31:58 +0000100 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000101
102 print "Setting signal handler"
103 signal.signal(signal.SIGINT, handle_sigint)
104
105 print "Connected! Running..."
106
107
108def enable_logging(logfile):
109 out_file = logfile
110 err_file = "%s.err" % logfile
111 print "Enabling logging to %s (%s)" % (out_file, err_file)
112 out_fd = open(out_file, "a", buffering=0)
113 err_fd = open(err_file, "a", buffering=0)
114
115 os.dup2(out_fd.fileno(), sys.stdout.fileno())
116 os.dup2(err_fd.fileno(), sys.stderr.fileno())
117
118 sys.stdout = out_fd
119 sys.stderr = err_fd
120
121
mblighd5c95802008-03-05 00:33:46 +0000122def queue_entries_to_abort():
123 rows = _db.execute("""
124 SELECT * FROM host_queue_entries WHERE status='Abort';
125 """)
126 qe = [HostQueueEntry(row=i) for i in rows]
127 return qe
mbligh36768f02008-02-22 18:28:33 +0000128
mblighe2586682008-02-29 22:45:46 +0000129def remove_file_or_dir(path):
130 if stat.S_ISDIR(os.stat(path).st_mode):
131 # directory
132 shutil.rmtree(path)
133 else:
134 # file
135 os.remove(path)
136
137
mbligh6f8bab42008-02-29 22:45:14 +0000138class DatabaseConn:
139 def __init__(self):
140 self.reconnect_wait = 20
141 self.conn = None
142 self.cur = None
mbligh36768f02008-02-22 18:28:33 +0000143
mbligh4eb2df22008-03-13 15:39:29 +0000144 import MySQLdb.converters
145 self.convert_dict = MySQLdb.converters.conversions
146 self.convert_dict.setdefault(bool, self.convert_boolean)
147
mbligh36768f02008-02-22 18:28:33 +0000148
mbligh4eb2df22008-03-13 15:39:29 +0000149 @staticmethod
150 def convert_boolean(boolean, conversion_dict):
151 'Convert booleans to integer strings'
152 return str(int(boolean))
153
154
showard30eed1f2008-05-27 22:31:58 +0000155 def connect(self, db_name=None):
mbligh6f8bab42008-02-29 22:45:14 +0000156 self.disconnect()
157
158 # get global config and parse for info
159 c = global_config.global_config
160 dbase = "AUTOTEST_WEB"
showard30eed1f2008-05-27 22:31:58 +0000161 db_host = c.get_config_value(dbase, "host")
162 if db_name is None:
163 db_name = c.get_config_value(dbase, "database")
mbligh6f8bab42008-02-29 22:45:14 +0000164
showard30eed1f2008-05-27 22:31:58 +0000165 if _testing_mode:
166 db_name = 'stresstest_autotest_web'
167
168 db_user = c.get_config_value(dbase, "user")
169 db_pass = c.get_config_value(dbase, "password")
mbligh6f8bab42008-02-29 22:45:14 +0000170
171 while not self.conn:
172 try:
mbligh4eb2df22008-03-13 15:39:29 +0000173 self.conn = MySQLdb.connect(
showard30eed1f2008-05-27 22:31:58 +0000174 host=db_host, user=db_user, passwd=db_pass,
175 db=db_name, conv=self.convert_dict)
mbligh6f8bab42008-02-29 22:45:14 +0000176
177 self.conn.autocommit(True)
178 self.cur = self.conn.cursor()
179 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000180 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000181 print "Can't connect to MYSQL; reconnecting"
182 time.sleep(self.reconnect_wait)
183 self.disconnect()
184
185
186 def disconnect(self):
187 if self.conn:
188 self.conn.close()
189 self.conn = None
190 self.cur = None
191
192
193 def execute(self, *args, **dargs):
194 while (True):
195 try:
196 self.cur.execute(*args, **dargs)
197 return self.cur.fetchall()
198 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000199 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000200 print "MYSQL connection died; reconnecting"
201 time.sleep(self.reconnect_wait)
202 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000203
204
mblighdbdac6c2008-03-05 15:49:58 +0000205def generate_parse_command(results_dir, flags=""):
206 parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
207 output = os.path.abspath(os.path.join(results_dir, '.parse.log'))
208 cmd = "%s %s -r -o %s > %s 2>&1 &"
209 return cmd % (parse, flags, results_dir, output)
210
211
mbligh36768f02008-02-22 18:28:33 +0000212def parse_results(results_dir, flags=""):
mbligh4314a712008-02-29 22:44:30 +0000213 if _testing_mode:
214 return
mblighdbdac6c2008-03-05 15:49:58 +0000215 os.system(generate_parse_command(results_dir, flags))
mbligh36768f02008-02-22 18:28:33 +0000216
217
mblighbb421852008-03-11 22:36:16 +0000218
219
mbligh36768f02008-02-22 18:28:33 +0000220def log_stacktrace(reason):
221 (type, value, tb) = sys.exc_info()
222 str = "EXCEPTION: %s\n" % reason
mbligh36768f02008-02-22 18:28:33 +0000223 str += ''.join(traceback.format_exception(type, value, tb))
224
225 sys.stderr.write("\n%s\n" % str)
showard7cf9a9b2008-05-15 21:15:52 +0000226 email_manager.enqueue_notify_email("monitor_db exception", str)
mbligh36768f02008-02-22 18:28:33 +0000227
mblighbb421852008-03-11 22:36:16 +0000228
229def get_proc_poll_fn(pid):
230 proc_path = os.path.join('/proc', str(pid))
231 def poll_fn():
232 if os.path.exists(proc_path):
233 return None
234 return 0 # we can't get a real exit code
235 return poll_fn
236
237
238def kill_autoserv(pid, poll_fn=None):
239 print 'killing', pid
240 if poll_fn is None:
241 poll_fn = get_proc_poll_fn(pid)
242 if poll_fn() == None:
243 os.kill(pid, signal.SIGCONT)
244 os.kill(pid, signal.SIGTERM)
mbligh36768f02008-02-22 18:28:33 +0000245
246
showard7cf9a9b2008-05-15 21:15:52 +0000247class EmailNotificationManager(object):
248 def __init__(self):
249 self._emails = []
250 # see os.getlogin() online docs
251 self._sender = pwd.getpwuid(os.getuid())[0]
252
253
254 def enqueue_notify_email(self, subject, message):
255 if not _notify_email:
256 return
257
258 body = 'Subject: ' + subject + '\n'
259 body += "%s / %s / %s\n%s" % (socket.gethostname(),
260 os.getpid(),
261 time.strftime("%X %x"), message)
262 self._emails.append(body)
263
264
265 def send_queued_emails(self):
266 if not self._emails:
267 return
268 subject = 'Scheduler notifications from ' + socket.gethostname()
269 separator = '\n' + '-' * 40 + '\n'
270 body = separator.join(self._emails)
271 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
272 self._sender, _notify_email, subject, body)
273
274 mailer = smtplib.SMTP('localhost')
275 mailer.sendmail(self._sender, _notify_email, msg)
276 mailer.quit()
277 self._emails = []
278
279email_manager = EmailNotificationManager()
280
281
mbligh36768f02008-02-22 18:28:33 +0000282class Dispatcher:
mbligh90a549d2008-03-25 23:52:34 +0000283 autoserv_procs_cache = None
showardec113162008-05-08 00:52:49 +0000284 max_running_agents = global_config.global_config.get_config_value(
285 _global_config_section, 'max_running_jobs', type=int)
286 max_jobs_started_per_cycle = (
287 global_config.global_config.get_config_value(
288 _global_config_section, 'max_jobs_started_per_cycle', type=int))
mbligh90a549d2008-03-25 23:52:34 +0000289
mblighbb421852008-03-11 22:36:16 +0000290 def __init__(self):
mbligh36768f02008-02-22 18:28:33 +0000291 self._agents = []
mbligh36768f02008-02-22 18:28:33 +0000292
mbligh36768f02008-02-22 18:28:33 +0000293
mblighbb421852008-03-11 22:36:16 +0000294 def do_initial_recovery(self, recover_hosts=True):
295 # always recover processes
296 self._recover_processes()
297
298 if recover_hosts:
299 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000300
301
302 def tick(self):
mbligh90a549d2008-03-25 23:52:34 +0000303 Dispatcher.autoserv_procs_cache = None
mblighbb421852008-03-11 22:36:16 +0000304 self._find_aborting()
showard04c82c52008-05-29 19:38:12 +0000305 self._schedule_new_jobs()
mbligh36768f02008-02-22 18:28:33 +0000306 self._handle_agents()
mbligh62ba2ed2008-04-30 17:09:25 +0000307 self._clear_inactive_blocks()
showard7cf9a9b2008-05-15 21:15:52 +0000308 email_manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000309
310
311 def add_agent(self, agent):
312 self._agents.append(agent)
313 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000314
315 # Find agent corresponding to the specified queue_entry
316 def get_agents(self, queue_entry):
317 res_agents = []
318 for agent in self._agents:
319 if queue_entry.id in agent.queue_entry_ids:
320 res_agents.append(agent)
321 return res_agents
322
323
324 def remove_agent(self, agent):
325 self._agents.remove(agent)
mbligh36768f02008-02-22 18:28:33 +0000326
327
showardec113162008-05-08 00:52:49 +0000328 def num_started_agents(self):
329 return len([agent for agent in self._agents
330 if agent.is_started()])
331
332
mbligh90a549d2008-03-25 23:52:34 +0000333 @classmethod
334 def find_autoservs(cls, orphans_only=False):
mblighbb421852008-03-11 22:36:16 +0000335 """\
336 Returns a dict mapping pids to command lines for root autoserv
mbligh90a549d2008-03-25 23:52:34 +0000337 processes. If orphans_only=True, return only processes that
338 have been orphaned (i.e. parent pid = 1).
mblighbb421852008-03-11 22:36:16 +0000339 """
mbligh90a549d2008-03-25 23:52:34 +0000340 if cls.autoserv_procs_cache is not None:
341 return cls.autoserv_procs_cache
342
mblighbb421852008-03-11 22:36:16 +0000343 proc = subprocess.Popen(
mbligh90a549d2008-03-25 23:52:34 +0000344 ['/bin/ps', 'x', '-o', 'pid,pgid,ppid,comm,args'],
mblighbb421852008-03-11 22:36:16 +0000345 stdout=subprocess.PIPE)
346 # split each line into the four columns output by ps
mbligh90a549d2008-03-25 23:52:34 +0000347 procs = [line.split(None, 4) for line in
mblighbb421852008-03-11 22:36:16 +0000348 proc.communicate()[0].splitlines()]
mbligh90a549d2008-03-25 23:52:34 +0000349 autoserv_procs = {}
350 for proc in procs:
351 # check ppid == 1 for orphans
352 if orphans_only and proc[2] != 1:
353 continue
354 # only root autoserv processes have pgid == pid
355 if (proc[3] == 'autoserv' and # comm
356 proc[1] == proc[0]): # pgid == pid
357 # map pid to args
358 autoserv_procs[int(proc[0])] = proc[4]
359 cls.autoserv_procs_cache = autoserv_procs
360 return autoserv_procs
mblighbb421852008-03-11 22:36:16 +0000361
362
363 def recover_queue_entry(self, queue_entry, run_monitor):
364 job = queue_entry.job
365 if job.is_synchronous():
366 all_queue_entries = job.get_host_queue_entries()
367 else:
368 all_queue_entries = [queue_entry]
369 all_queue_entry_ids = [queue_entry.id for queue_entry
370 in all_queue_entries]
371 queue_task = RecoveryQueueTask(
372 job=queue_entry.job,
373 queue_entries=all_queue_entries,
374 run_monitor=run_monitor)
375 self.add_agent(Agent(tasks=[queue_task],
376 queue_entry_ids=all_queue_entry_ids))
377
378
379 def _recover_processes(self):
mbligh90a549d2008-03-25 23:52:34 +0000380 orphans = self.find_autoservs(orphans_only=True)
mblighbb421852008-03-11 22:36:16 +0000381
382 # first, recover running queue entries
383 rows = _db.execute("""SELECT * FROM host_queue_entries
384 WHERE status = 'Running'""")
385 queue_entries = [HostQueueEntry(row=i) for i in rows]
386 requeue_entries = []
387 recovered_entry_ids = set()
388 for queue_entry in queue_entries:
389 run_monitor = PidfileRunMonitor(
390 queue_entry.results_dir())
391 pid, exit_code = run_monitor.get_pidfile_info()
392 if pid is None:
393 # autoserv apparently never got run, so requeue
394 requeue_entries.append(queue_entry)
395 continue
396 if queue_entry.id in recovered_entry_ids:
397 # synchronous job we've already recovered
398 continue
399 print 'Recovering queue entry %d (pid %d)' % (
400 queue_entry.id, pid)
401 job = queue_entry.job
402 if job.is_synchronous():
403 for entry in job.get_host_queue_entries():
404 assert entry.active
405 recovered_entry_ids.add(entry.id)
406 self.recover_queue_entry(queue_entry,
407 run_monitor)
408 orphans.pop(pid, None)
409
410 # and requeue other active queue entries
411 rows = _db.execute("""SELECT * FROM host_queue_entries
412 WHERE active AND NOT complete
413 AND status != 'Running'
414 AND status != 'Pending'
415 AND status != 'Abort'
416 AND status != 'Aborting'""")
417 queue_entries = [HostQueueEntry(row=i) for i in rows]
418 for queue_entry in queue_entries + requeue_entries:
419 print 'Requeuing running QE %d' % queue_entry.id
mbligh90a549d2008-03-25 23:52:34 +0000420 queue_entry.clear_results_dir(dont_delete_files=True)
mblighbb421852008-03-11 22:36:16 +0000421 queue_entry.requeue()
422
423
424 # now kill any remaining autoserv processes
425 for pid in orphans.keys():
426 print 'Killing orphan %d (%s)' % (pid, orphans[pid])
427 kill_autoserv(pid)
428
429 # recover aborting tasks
mbligh90a549d2008-03-25 23:52:34 +0000430 rebooting_host_ids = set()
mblighd5c95802008-03-05 00:33:46 +0000431 rows = _db.execute("""SELECT * FROM host_queue_entries
432 WHERE status='Abort' or status='Aborting'""")
mblighbb421852008-03-11 22:36:16 +0000433 queue_entries = [HostQueueEntry(row=i) for i in rows]
434 for queue_entry in queue_entries:
mbligh90a549d2008-03-25 23:52:34 +0000435 print 'Recovering aborting QE %d' % queue_entry.id
mblighbb421852008-03-11 22:36:16 +0000436 queue_host = queue_entry.get_host()
437 reboot_task = RebootTask(queue_host)
438 verify_task = VerifyTask(host = queue_host)
439 self.add_agent(Agent(tasks=[reboot_task,
440 verify_task],
441 queue_entry_ids=[queue_entry.id]))
442 queue_entry.set_status('Aborted')
443 # Secure the host from being picked up
444 queue_host.set_status('Rebooting')
mbligh90a549d2008-03-25 23:52:34 +0000445 rebooting_host_ids.add(queue_host.id)
mblighd5c95802008-03-05 00:33:46 +0000446
mblighbb421852008-03-11 22:36:16 +0000447 # reverify hosts that were in the middle of verify, repair or
448 # reboot
mbligh90a549d2008-03-25 23:52:34 +0000449 self._reverify_hosts_where("""(status = 'Repairing' OR
450 status = 'Verifying' OR
451 status = 'Rebooting')""",
452 exclude_ids=rebooting_host_ids)
453
454 # finally, recover "Running" hosts with no active queue entries,
455 # although this should never happen
456 message = ('Recovering running host %s - this probably '
457 'indicates a scheduler bug')
458 self._reverify_hosts_where("""status = 'Running' AND
459 id NOT IN (SELECT host_id
460 FROM host_queue_entries
461 WHERE active)""",
462 print_message=message)
463
464
465 def _reverify_hosts_where(self, where,
466 print_message='Reverifying host %s',
467 exclude_ids=set()):
mbligh5244cbb2008-04-24 20:39:52 +0000468 rows = _db.execute('SELECT * FROM hosts WHERE locked = 0 AND '
469 'invalid = 0 AND ' + where)
mblighbb421852008-03-11 22:36:16 +0000470 hosts = [Host(row=i) for i in rows]
471 for host in hosts:
mbligh90a549d2008-03-25 23:52:34 +0000472 if host.id in exclude_ids:
473 continue
474 if print_message is not None:
475 print print_message % host.hostname
476 verify_task = VerifyTask(host = host)
477 self.add_agent(Agent(tasks = [verify_task]))
mblighbb421852008-03-11 22:36:16 +0000478
479
480 def _recover_hosts(self):
mbligh90a549d2008-03-25 23:52:34 +0000481 # recover "Repair Failed" hosts
482 message = 'Reverifying dead host %s'
483 self._reverify_hosts_where("status = 'Repair Failed'",
484 print_message=message)
mbligh36768f02008-02-22 18:28:33 +0000485
486
mbligh62ba2ed2008-04-30 17:09:25 +0000487 def _clear_inactive_blocks(self):
488 """
489 Clear out blocks for all completed jobs.
490 """
491 # this would be simpler using NOT IN (subquery), but MySQL
492 # treats all IN subqueries as dependent, so this optimizes much
493 # better
494 _db.execute("""
495 DELETE ihq FROM ineligible_host_queues ihq
showard93ff7ea2008-05-27 19:44:18 +0000496 LEFT JOIN (SELECT job_id FROM host_queue_entries
mbligh62ba2ed2008-04-30 17:09:25 +0000497 WHERE NOT complete) hqe
498 USING (job_id) WHERE hqe.job_id IS NULL""")
499
500
showard04c82c52008-05-29 19:38:12 +0000501 def _extract_host_and_queue_entry(self, row):
502 # each row contains host columns followed by host queue entry
503 # columns
504 num_host_cols = Host.num_cols()
505 assert len(row) == num_host_cols + HostQueueEntry.num_cols()
506 host = Host(row=row[:num_host_cols])
507 queue_entry = HostQueueEntry(row=row[num_host_cols:])
508 return host, queue_entry
509
510
511 def _get_runnable_entries(self, extra_join='', extra_where=''):
512 query = (
513 'SELECT DISTINCT h.*, queued_hqe.* FROM hosts h '
514 # join with running entries
515 """
showard20f47062008-06-05 19:44:04 +0000516 LEFT JOIN host_queue_entries AS active_hqe
517 ON (h.id = active_hqe.host_id AND active_hqe.active)
showard04c82c52008-05-29 19:38:12 +0000518 """ +
519 extra_join +
520 # exclude hosts with a running entry
521 'WHERE active_hqe.host_id IS NULL '
522 # exclude locked, invalid, and non-Ready hosts
523 """
524 AND h.locked=false AND h.invalid=false
525 AND (h.status IS null OR h.status='Ready')
526 """)
527 if extra_where:
528 query += 'AND ' + extra_where + '\n'
529 # respect priority, then sort by ID (most recent first)
530 query += 'ORDER BY queued_hqe.priority DESC, queued_hqe.id'
531
532 rows = _db.execute(query)
533 return [self._extract_host_and_queue_entry(row) for row in rows]
534
535
536 def _get_runnable_nonmetahosts(self):
537 # find queued HQEs scheduled directly against hosts
538 queued_hqe_join = """
showard20f47062008-06-05 19:44:04 +0000539 INNER JOIN host_queue_entries AS queued_hqe
540 ON (h.id = queued_hqe.host_id
541 AND NOT queued_hqe.active AND NOT queued_hqe.complete)
showard04c82c52008-05-29 19:38:12 +0000542 """
543 return self._get_runnable_entries(queued_hqe_join)
544
545
546 def _get_runnable_metahosts(self):
547 # join with labels for metahost matching
showard20f47062008-06-05 19:44:04 +0000548 labels_join = 'INNER JOIN hosts_labels hl ON (hl.host_id=h.id)'
showard04c82c52008-05-29 19:38:12 +0000549 # find queued HQEs scheduled for metahosts that match idle hosts
550 queued_hqe_join = """
showard20f47062008-06-05 19:44:04 +0000551 INNER JOIN host_queue_entries AS queued_hqe
552 ON (queued_hqe.meta_host = hl.label_id
553 AND queued_hqe.host_id IS NULL
554 AND NOT queued_hqe.active AND NOT queued_hqe.complete)
showard04c82c52008-05-29 19:38:12 +0000555 """
showard20f47062008-06-05 19:44:04 +0000556 # need to exclude acl-inaccessible hosts
557 acl_join = """
558 INNER JOIN acl_groups_hosts ON h.id=acl_groups_hosts.host_id
559 INNER JOIN acl_groups_users
560 ON acl_groups_users.acl_group_id=acl_groups_hosts.acl_group_id
561 INNER JOIN users ON acl_groups_users.user_id=users.id
562 INNER JOIN jobs
563 ON users.login=jobs.owner AND jobs.id=queued_hqe.job_id
564 """
565 # need to exclude blocked hosts
showard04c82c52008-05-29 19:38:12 +0000566 block_join = """
showard20f47062008-06-05 19:44:04 +0000567 LEFT JOIN ineligible_host_queues AS ihq
showard04c82c52008-05-29 19:38:12 +0000568 ON (ihq.job_id=queued_hqe.job_id AND ihq.host_id=h.id)
569 """
showard20f47062008-06-05 19:44:04 +0000570 block_where = 'ihq.id IS NULL'
showard04c82c52008-05-29 19:38:12 +0000571 extra_join = '\n'.join([labels_join, queued_hqe_join,
showard20f47062008-06-05 19:44:04 +0000572 acl_join, block_join])
showard04c82c52008-05-29 19:38:12 +0000573 return self._get_runnable_entries(extra_join,
574 extra_where=block_where)
575
576
577 def _schedule_new_jobs(self):
mbligh36768f02008-02-22 18:28:33 +0000578 print "finding work"
579
showard04c82c52008-05-29 19:38:12 +0000580 scheduled_hosts, scheduled_queue_entries = set(), set()
581 runnable = (self._get_runnable_nonmetahosts() +
582 self._get_runnable_metahosts())
583 for host, queue_entry in runnable:
584 # we may get duplicate entries for a host or a queue
585 # entry. we need to schedule each host and each queue
586 # entry only once.
587 if (host.id in scheduled_hosts or
588 queue_entry.id in scheduled_queue_entries):
589 continue
showard20f47062008-06-05 19:44:04 +0000590 agent = queue_entry.run(assigned_host=host)
591 self.add_agent(agent)
592 scheduled_hosts.add(host.id)
593 scheduled_queue_entries.add(queue_entry.id)
mbligh36768f02008-02-22 18:28:33 +0000594
595
showard30eed1f2008-05-27 22:31:58 +0000596 def _find_aborting(self):
mblighd5c95802008-03-05 00:33:46 +0000597 num_aborted = 0
598 # Find jobs that are aborting
599 for entry in queue_entries_to_abort():
600 agents_to_abort = self.get_agents(entry)
601 entry_host = entry.get_host()
602 reboot_task = RebootTask(entry_host)
603 verify_task = VerifyTask(host = entry_host)
604 tasks = [reboot_task, verify_task]
605 if agents_to_abort:
606 abort_task = AbortTask(entry, agents_to_abort)
607 tasks.insert(0, abort_task)
608 else:
609 entry.set_status('Aborted')
610 # just to make sure this host does not get
611 # taken away
612 entry_host.set_status('Rebooting')
613 self.add_agent(Agent(tasks=tasks,
614 queue_entry_ids = [entry.id]))
615 num_aborted += 1
616 if num_aborted >= 50:
617 break
618
619
mbligh36768f02008-02-22 18:28:33 +0000620 def _handle_agents(self):
621 still_running = []
showardec113162008-05-08 00:52:49 +0000622 num_started = self.num_started_agents()
623 start_new = (num_started < self.max_running_agents)
624 num_started_this_cycle = 0
mbligh36768f02008-02-22 18:28:33 +0000625 for agent in self._agents:
showardec113162008-05-08 00:52:49 +0000626 if not agent.is_started():
627 if not start_new:
628 still_running.append(agent)
629 continue
630 num_started += 1
631 num_started_this_cycle += 1
632 if (num_started >= self.max_running_agents or
633 num_started_this_cycle >=
634 self.max_jobs_started_per_cycle):
635 start_new = False
mbligh36768f02008-02-22 18:28:33 +0000636 agent.tick()
637 if not agent.is_done():
638 still_running.append(agent)
639 else:
640 print "agent finished"
641 self._agents = still_running
showardec113162008-05-08 00:52:49 +0000642 print num_started, 'running agents'
mbligh36768f02008-02-22 18:28:33 +0000643
644
645class RunMonitor(object):
646 def __init__(self, cmd, nice_level = None, log_file = None):
647 self.nice_level = nice_level
648 self.log_file = log_file
649 self.proc = self.run(cmd)
650
651 def run(self, cmd):
652 if self.nice_level:
653 nice_cmd = ['nice','-n', str(self.nice_level)]
654 nice_cmd.extend(cmd)
655 cmd = nice_cmd
656
657 out_file = None
658 if self.log_file:
659 try:
mblighbb421852008-03-11 22:36:16 +0000660 os.makedirs(os.path.dirname(self.log_file))
mblighcadb3532008-04-15 17:46:26 +0000661 except OSError, exc:
662 if exc.errno != errno.EEXIST:
663 log_stacktrace(
664 'Unexpected error creating logfile '
665 'directory for %s' % self.log_file)
666 try:
mbligh36768f02008-02-22 18:28:33 +0000667 out_file = open(self.log_file, 'a')
668 out_file.write("\n%s\n" % ('*'*80))
mblighcadb3532008-04-15 17:46:26 +0000669 out_file.write("%s> %s\n" %
670 (time.strftime("%X %x"), cmd))
mbligh36768f02008-02-22 18:28:33 +0000671 out_file.write("%s\n" % ('*'*80))
mblighcadb3532008-04-15 17:46:26 +0000672 except (OSError, IOError):
673 log_stacktrace('Error opening log file %s' %
674 self.log_file)
675
mbligh36768f02008-02-22 18:28:33 +0000676 if not out_file:
677 out_file = open('/dev/null', 'w')
mblighcadb3532008-04-15 17:46:26 +0000678
mbligh36768f02008-02-22 18:28:33 +0000679 in_devnull = open('/dev/null', 'r')
680 print "cmd = %s" % cmd
681 print "path = %s" % os.getcwd()
682
683 proc = subprocess.Popen(cmd, stdout=out_file,
684 stderr=subprocess.STDOUT, stdin=in_devnull)
685 out_file.close()
686 in_devnull.close()
687 return proc
688
689
mblighbb421852008-03-11 22:36:16 +0000690 def get_pid(self):
691 return self.proc.pid
692
693
mbligh36768f02008-02-22 18:28:33 +0000694 def kill(self):
mblighbb421852008-03-11 22:36:16 +0000695 kill_autoserv(self.get_pid(), self.exit_code)
696
mbligh36768f02008-02-22 18:28:33 +0000697
698 def exit_code(self):
699 return self.proc.poll()
700
701
mblighbb421852008-03-11 22:36:16 +0000702class PidfileException(Exception):
703 """\
704 Raised when there's some unexpected behavior with the pid file.
705 """
706
707
708class PidfileRunMonitor(RunMonitor):
709 def __init__(self, results_dir, cmd=None, nice_level=None,
710 log_file=None):
711 self.results_dir = os.path.abspath(results_dir)
712 self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
713 self.lost_process = False
mbligh90a549d2008-03-25 23:52:34 +0000714 self.start_time = time.time()
mblighbb421852008-03-11 22:36:16 +0000715 if cmd is None:
716 # we're reattaching to an existing pid, so don't call
717 # the superconstructor (we don't want to kick off a new
718 # process)
719 pass
720 else:
mblighd64e5702008-04-04 21:39:28 +0000721 super(PidfileRunMonitor, self).__init__(cmd,
722 nice_level, log_file)
mblighbb421852008-03-11 22:36:16 +0000723
724
725 def get_pid(self):
726 pid, exit_status = self.get_pidfile_info()
727 assert pid is not None
728 return pid
729
730
mbligh90a549d2008-03-25 23:52:34 +0000731 def _check_command_line(self, command_line, spacer=' ',
732 print_error=False):
733 results_dir_arg = spacer.join(('', '-r', self.results_dir, ''))
734 match = results_dir_arg in command_line
735 if print_error and not match:
736 print '%s not found in %s' % (repr(results_dir_arg),
737 repr(command_line))
738 return match
739
740
741 def _check_proc_fs(self, pid):
mblighbb421852008-03-11 22:36:16 +0000742 cmdline_path = os.path.join('/proc', str(pid), 'cmdline')
743 try:
744 cmdline_file = open(cmdline_path, 'r')
745 cmdline = cmdline_file.read().strip()
746 cmdline_file.close()
747 except IOError:
748 return False
749 # /proc/.../cmdline has \x00 separating args
mbligh90a549d2008-03-25 23:52:34 +0000750 return self._check_command_line(cmdline, spacer='\x00',
751 print_error=True)
mblighbb421852008-03-11 22:36:16 +0000752
753
754 def read_pidfile(self):
755 if not os.path.exists(self.pid_file):
756 return None, None
757 file_obj = open(self.pid_file, 'r')
758 lines = file_obj.readlines()
759 file_obj.close()
760 assert 1 <= len(lines) <= 2
761 try:
762 pid = int(lines[0])
763 exit_status = None
764 if len(lines) == 2:
765 exit_status = int(lines[1])
766 except ValueError, exc:
767 raise Exception('Corrupt pid file: ' + str(exc.args))
768
769 return pid, exit_status
770
771
mbligh90a549d2008-03-25 23:52:34 +0000772 def _find_autoserv_proc(self):
773 autoserv_procs = Dispatcher.find_autoservs()
774 for pid, args in autoserv_procs.iteritems():
775 if self._check_command_line(args):
776 return pid, args
777 return None, None
778
779
mblighbb421852008-03-11 22:36:16 +0000780 def get_pidfile_info(self):
781 """\
782 Returns:
783 None, None if autoserv has not yet run
784 pid, None if autoserv is running
785 pid, exit_status if autoserv has completed
786 """
787 if self.lost_process:
788 return self.pid, self.exit_status
789
790 pid, exit_status = self.read_pidfile()
791
mbligh90a549d2008-03-25 23:52:34 +0000792 if pid is None:
793 return self._handle_no_pid()
794
795 if exit_status is None:
796 # double check whether or not autoserv is running
797 proc_running = self._check_proc_fs(pid)
798 if proc_running:
799 return pid, exit_status
800
801 # pid but no process - maybe process *just* exited
mblighbb421852008-03-11 22:36:16 +0000802 pid, exit_status = self.read_pidfile()
mbligh90a549d2008-03-25 23:52:34 +0000803 if exit_status is None:
mblighbb421852008-03-11 22:36:16 +0000804 # autoserv exited without writing an exit code
805 # to the pidfile
806 error = ('autoserv died without writing exit '
807 'code')
808 message = error + '\nPid: %s\nPidfile: %s' % (
809 pid, self.pid_file)
810 print message
showard7cf9a9b2008-05-15 21:15:52 +0000811 email_manager.enqueue_notify_email(error,
812 message)
mbligh90a549d2008-03-25 23:52:34 +0000813 self.on_lost_process(pid)
mblighbb421852008-03-11 22:36:16 +0000814 return self.pid, self.exit_status
815
816 return pid, exit_status
817
818
mbligh90a549d2008-03-25 23:52:34 +0000819 def _handle_no_pid(self):
820 """\
821 Called when no pidfile is found or no pid is in the pidfile.
822 """
823 # is autoserv running?
824 pid, args = self._find_autoserv_proc()
825 if pid is None:
826 # no autoserv process running
827 message = 'No pid found at ' + self.pid_file
828 else:
829 message = ("Process %d (%s) hasn't written pidfile %s" %
830 (pid, args, self.pid_file))
831
832 print message
833 if time.time() - self.start_time > PIDFILE_TIMEOUT:
showard7cf9a9b2008-05-15 21:15:52 +0000834 email_manager.enqueue_notify_email(
835 'Process has failed to write pidfile', message)
mbligh90a549d2008-03-25 23:52:34 +0000836 if pid is not None:
837 kill_autoserv(pid)
838 else:
839 pid = 0
840 self.on_lost_process(pid)
841 return self.pid, self.exit_status
842
843 return None, None
844
845
846 def on_lost_process(self, pid):
847 """\
848 Called when autoserv has exited without writing an exit status,
849 or we've timed out waiting for autoserv to write a pid to the
850 pidfile. In either case, we just return failure and the caller
851 should signal some kind of warning.
852
853 pid is unimportant here, as it shouldn't be used by anyone.
854 """
855 self.lost_process = True
856 self.pid = pid
857 self.exit_status = 1
858
859
mblighbb421852008-03-11 22:36:16 +0000860 def exit_code(self):
861 pid, exit_code = self.get_pidfile_info()
mblighbb421852008-03-11 22:36:16 +0000862 return exit_code
863
864
mbligh36768f02008-02-22 18:28:33 +0000865class Agent(object):
mblighd5c95802008-03-05 00:33:46 +0000866 def __init__(self, tasks, queue_entry_ids=[]):
mbligh36768f02008-02-22 18:28:33 +0000867 self.active_task = None
868 self.queue = Queue.Queue(0)
869 self.dispatcher = None
mblighd5c95802008-03-05 00:33:46 +0000870 self.queue_entry_ids = queue_entry_ids
mbligh36768f02008-02-22 18:28:33 +0000871
872 for task in tasks:
873 self.add_task(task)
874
875
876 def add_task(self, task):
877 self.queue.put_nowait(task)
878 task.agent = self
879
880
881 def tick(self):
882 print "agent tick"
883 if self.active_task and not self.active_task.is_done():
884 self.active_task.poll()
885 else:
886 self._next_task();
887
888
889 def _next_task(self):
890 print "agent picking task"
891 if self.active_task:
892 assert self.active_task.is_done()
893
mblighe2586682008-02-29 22:45:46 +0000894 if not self.active_task.success:
895 self.on_task_failure()
896
mbligh36768f02008-02-22 18:28:33 +0000897 self.active_task = None
898 if not self.is_done():
899 self.active_task = self.queue.get_nowait()
900 if self.active_task:
901 self.active_task.start()
902
903
mblighe2586682008-02-29 22:45:46 +0000904 def on_task_failure(self):
mblighe2586682008-02-29 22:45:46 +0000905 self.queue = Queue.Queue(0)
906 for task in self.active_task.failure_tasks:
907 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +0000908
mblighe2586682008-02-29 22:45:46 +0000909
showardec113162008-05-08 00:52:49 +0000910 def is_started(self):
911 return self.active_task is not None
912
913
mbligh36768f02008-02-22 18:28:33 +0000914 def is_done(self):
915 return self.active_task == None and self.queue.empty()
916
917
918 def start(self):
919 assert self.dispatcher
920
921 self._next_task()
922
mblighd5c95802008-03-05 00:33:46 +0000923
mbligh36768f02008-02-22 18:28:33 +0000924class AgentTask(object):
mbligh16c722d2008-03-05 00:58:44 +0000925 def __init__(self, cmd, failure_tasks = []):
mbligh36768f02008-02-22 18:28:33 +0000926 self.done = False
927 self.failure_tasks = failure_tasks
928 self.started = False
929 self.cmd = cmd
mblighd5c95802008-03-05 00:33:46 +0000930 self.task = None
mbligh36768f02008-02-22 18:28:33 +0000931 self.agent = None
mblighd5c95802008-03-05 00:33:46 +0000932 self.monitor = None
mblighd64e5702008-04-04 21:39:28 +0000933 self.success = None
mbligh36768f02008-02-22 18:28:33 +0000934
935
936 def poll(self):
937 print "poll"
mblighd5c95802008-03-05 00:33:46 +0000938 if self.monitor:
mbligh36768f02008-02-22 18:28:33 +0000939 self.tick(self.monitor.exit_code())
940 else:
941 self.finished(False)
942
943
944 def tick(self, exit_code):
945 if exit_code==None:
946 return
947# print "exit_code was %d" % exit_code
948 if exit_code == 0:
949 success = True
950 else:
951 success = False
952
953 self.finished(success)
954
955
956 def is_done(self):
957 return self.done
958
959
960 def finished(self, success):
961 self.done = True
962 self.success = success
963 self.epilog()
964
965
966 def prolog(self):
967 pass
968
mblighd64e5702008-04-04 21:39:28 +0000969
970 def create_temp_resultsdir(self, suffix=''):
971 self.temp_results_dir = tempfile.mkdtemp(suffix=suffix)
972
973
974 def cleanup(self):
975 if (hasattr(self, 'temp_results_dir') and
976 os.path.exists(self.temp_results_dir)):
977 shutil.rmtree(self.temp_results_dir)
978
mbligh36768f02008-02-22 18:28:33 +0000979
980 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +0000981 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +0000982
983
984 def start(self):
985 assert self.agent
986
987 if not self.started:
988 self.prolog()
989 self.run()
990
991 self.started = True
992
mblighd64e5702008-04-04 21:39:28 +0000993
mbligh36768f02008-02-22 18:28:33 +0000994 def abort(self):
mblighd5c95802008-03-05 00:33:46 +0000995 if self.monitor:
996 self.monitor.kill()
997 self.done = True
mblighd64e5702008-04-04 21:39:28 +0000998 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +0000999
1000
1001 def run(self):
1002 if self.cmd:
1003 print "agent starting monitor"
mbligh36768f02008-02-22 18:28:33 +00001004 log_file = None
1005 if hasattr(self, 'host'):
mblighbb421852008-03-11 22:36:16 +00001006 log_file = os.path.join(RESULTS_DIR, 'hosts',
1007 self.host.hostname)
1008 self.monitor = RunMonitor(
1009 self.cmd, nice_level = AUTOSERV_NICE_LEVEL,
1010 log_file = log_file)
mbligh36768f02008-02-22 18:28:33 +00001011
1012
1013class RepairTask(AgentTask):
mbligh16c722d2008-03-05 00:58:44 +00001014 def __init__(self, host, fail_queue_entry=None):
1015 """\
1016 fail_queue_entry: queue entry to mark failed if this repair
1017 fails.
1018 """
mblighd64e5702008-04-04 21:39:28 +00001019 self.create_temp_resultsdir('.repair')
1020 cmd = [_autoserv_path , '-R', '-m', host.hostname,
1021 '-r', self.temp_results_dir]
mbligh36768f02008-02-22 18:28:33 +00001022 self.host = host
mbligh16c722d2008-03-05 00:58:44 +00001023 self.fail_queue_entry = fail_queue_entry
mblighd64e5702008-04-04 21:39:28 +00001024 super(RepairTask, self).__init__(cmd)
mblighe2586682008-02-29 22:45:46 +00001025
mbligh36768f02008-02-22 18:28:33 +00001026
1027 def prolog(self):
1028 print "repair_task starting"
1029 self.host.set_status('Repairing')
1030
1031
1032 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001033 super(RepairTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001034 if self.success:
mbligh16c722d2008-03-05 00:58:44 +00001035 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001036 else:
mbligh16c722d2008-03-05 00:58:44 +00001037 self.host.set_status('Repair Failed')
1038 if self.fail_queue_entry:
1039 self.fail_queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +00001040
1041
1042class VerifyTask(AgentTask):
1043 def __init__(self, queue_entry=None, host=None):
1044 assert bool(queue_entry) != bool(host)
1045
1046 self.host = host or queue_entry.host
1047 self.queue_entry = queue_entry
1048
mblighd64e5702008-04-04 21:39:28 +00001049 self.create_temp_resultsdir('.verify')
mbligh48c10a52008-02-29 22:46:38 +00001050 cmd = [_autoserv_path,'-v','-m',self.host.hostname,
mbligh36768f02008-02-22 18:28:33 +00001051 '-r', self.temp_results_dir]
1052
mbligh16c722d2008-03-05 00:58:44 +00001053 fail_queue_entry = None
1054 if queue_entry and not queue_entry.meta_host:
1055 fail_queue_entry = queue_entry
1056 failure_tasks = [RepairTask(self.host, fail_queue_entry)]
mblighe2586682008-02-29 22:45:46 +00001057
mblighd64e5702008-04-04 21:39:28 +00001058 super(VerifyTask, self).__init__(cmd,
1059 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001060
1061
mbligh36768f02008-02-22 18:28:33 +00001062 def prolog(self):
1063 print "starting verify on %s" % (self.host.hostname)
1064 if self.queue_entry:
1065 self.queue_entry.set_status('Verifying')
mblighdffd6372008-02-29 22:47:33 +00001066 self.queue_entry.clear_results_dir(
1067 self.queue_entry.verify_results_dir())
mbligh36768f02008-02-22 18:28:33 +00001068 self.host.set_status('Verifying')
1069
1070
mblighd64e5702008-04-04 21:39:28 +00001071 def cleanup(self):
1072 if not os.path.exists(self.temp_results_dir):
1073 return
mbligh36768f02008-02-22 18:28:33 +00001074 if self.queue_entry and (self.success or
mblighd64e5702008-04-04 21:39:28 +00001075 not self.queue_entry.meta_host):
mbligh36768f02008-02-22 18:28:33 +00001076 self.move_results()
mblighd64e5702008-04-04 21:39:28 +00001077 super(VerifyTask, self).cleanup()
1078
1079
1080 def epilog(self):
1081 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001082
1083 if self.success:
mbligh16c722d2008-03-05 00:58:44 +00001084 self.host.set_status('Ready')
1085 elif self.queue_entry:
mblighdffd6372008-02-29 22:47:33 +00001086 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +00001087
1088
1089 def move_results(self):
1090 assert self.queue_entry is not None
mblighe2586682008-02-29 22:45:46 +00001091 target_dir = self.queue_entry.verify_results_dir()
mbligh36768f02008-02-22 18:28:33 +00001092 if not os.path.exists(target_dir):
1093 os.makedirs(target_dir)
1094 files = os.listdir(self.temp_results_dir)
1095 for filename in files:
mblighbb421852008-03-11 22:36:16 +00001096 if filename == AUTOSERV_PID_FILE:
1097 continue
mblighe2586682008-02-29 22:45:46 +00001098 self.force_move(os.path.join(self.temp_results_dir,
1099 filename),
1100 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +00001101
1102
mblighe2586682008-02-29 22:45:46 +00001103 @staticmethod
1104 def force_move(source, dest):
1105 """\
1106 Replacement for shutil.move() that will delete the destination
1107 if it exists, even if it's a directory.
1108 """
1109 if os.path.exists(dest):
1110 print ('Warning: removing existing destination file ' +
1111 dest)
1112 remove_file_or_dir(dest)
1113 shutil.move(source, dest)
1114
1115
mblighdffd6372008-02-29 22:47:33 +00001116class VerifySynchronousTask(VerifyTask):
1117 def __init__(self, queue_entry):
mblighd64e5702008-04-04 21:39:28 +00001118 super(VerifySynchronousTask, self).__init__(
1119 queue_entry = queue_entry)
mblighdffd6372008-02-29 22:47:33 +00001120
1121
mbligh16c722d2008-03-05 00:58:44 +00001122 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001123 super(VerifySynchronousTask, self).epilog()
mbligh16c722d2008-03-05 00:58:44 +00001124 if self.success:
1125 if self.queue_entry.job.num_complete() > 0:
1126 # some other entry failed verify, and we've
1127 # already been marked as stopped
1128 return
mblighdffd6372008-02-29 22:47:33 +00001129
mbligh16c722d2008-03-05 00:58:44 +00001130 self.queue_entry.set_status('Pending')
1131 job = self.queue_entry.job
1132 if job.is_ready():
1133 agent = job.run(self.queue_entry)
1134 self.agent.dispatcher.add_agent(agent)
mblighe2586682008-02-29 22:45:46 +00001135
mbligh36768f02008-02-22 18:28:33 +00001136class QueueTask(AgentTask):
1137 def __init__(self, job, queue_entries, cmd):
mblighd64e5702008-04-04 21:39:28 +00001138 super(QueueTask, self).__init__(cmd)
mbligh36768f02008-02-22 18:28:33 +00001139 self.job = job
1140 self.queue_entries = queue_entries
1141
1142
mblighbb421852008-03-11 22:36:16 +00001143 @staticmethod
1144 def _write_keyval(results_dir, field, value):
1145 key_path = os.path.join(results_dir, 'keyval')
mbligh36768f02008-02-22 18:28:33 +00001146 keyval_file = open(key_path, 'a')
1147 print >> keyval_file, '%s=%d' % (field, value)
1148 keyval_file.close()
1149
1150
mblighbb421852008-03-11 22:36:16 +00001151 def results_dir(self):
1152 return self.queue_entries[0].results_dir()
1153
1154
1155 def run(self):
1156 """\
1157 Override AgentTask.run() so we can use a PidfileRunMonitor.
1158 """
1159 self.monitor = PidfileRunMonitor(self.results_dir(),
1160 cmd=self.cmd,
1161 nice_level=AUTOSERV_NICE_LEVEL)
1162
1163
mbligh36768f02008-02-22 18:28:33 +00001164 def prolog(self):
mblighe2586682008-02-29 22:45:46 +00001165 # write some job timestamps into the job keyval file
1166 queued = time.mktime(self.job.created_on.timetuple())
1167 started = time.time()
mblighbb421852008-03-11 22:36:16 +00001168 self._write_keyval(self.results_dir(), "job_queued", queued)
1169 self._write_keyval(self.results_dir(), "job_started", started)
mbligh36768f02008-02-22 18:28:33 +00001170 for queue_entry in self.queue_entries:
1171 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
1172 queue_entry.set_status('Running')
1173 queue_entry.host.set_status('Running')
mblighe2586682008-02-29 22:45:46 +00001174 if (not self.job.is_synchronous() and
1175 self.job.num_machines() > 1):
1176 assert len(self.queue_entries) == 1
1177 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001178
1179
jadmanskic2ac77f2008-05-16 21:44:04 +00001180 def _finish_task(self):
1181 # write out the finished time into the results keyval
1182 finished = time.time()
1183 self._write_keyval(self.results_dir(), "job_finished",
1184 finished)
1185
1186 # parse the results of the job
1187 if self.job.is_synchronous() or self.job.num_machines() == 1:
1188 parse_results(self.job.results_dir())
1189 else:
1190 for queue_entry in self.queue_entries:
1191 parse_results(queue_entry.results_dir(),
1192 flags="-l 2")
1193
1194
1195 def abort(self):
1196 super(QueueTask, self).abort()
1197 self._finish_task()
1198
1199
mbligh36768f02008-02-22 18:28:33 +00001200 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001201 super(QueueTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001202 if self.success:
1203 status = 'Completed'
1204 else:
1205 status = 'Failed'
1206
1207 for queue_entry in self.queue_entries:
1208 queue_entry.set_status(status)
1209 queue_entry.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001210
jadmanskic2ac77f2008-05-16 21:44:04 +00001211 self._finish_task()
mblighbb421852008-03-11 22:36:16 +00001212
mbligh36768f02008-02-22 18:28:33 +00001213 print "queue_task finished with %s/%s" % (status, self.success)
1214
1215
mblighbb421852008-03-11 22:36:16 +00001216class RecoveryQueueTask(QueueTask):
1217 def __init__(self, job, queue_entries, run_monitor):
mblighd64e5702008-04-04 21:39:28 +00001218 super(RecoveryQueueTask, self).__init__(job,
1219 queue_entries, cmd=None)
mblighbb421852008-03-11 22:36:16 +00001220 self.run_monitor = run_monitor
1221
1222
1223 def run(self):
1224 self.monitor = self.run_monitor
1225
1226
1227 def prolog(self):
1228 # recovering an existing process - don't do prolog
1229 pass
1230
1231
mbligh36768f02008-02-22 18:28:33 +00001232class RebootTask(AgentTask):
mblighd5c95802008-03-05 00:33:46 +00001233 def __init__(self, host):
1234 global _autoserv_path
1235
1236 # Current implementation of autoserv requires control file
1237 # to be passed on reboot action request. TODO: remove when no
1238 # longer appropriate.
mblighd64e5702008-04-04 21:39:28 +00001239 self.create_temp_resultsdir('.reboot')
mblighd5c95802008-03-05 00:33:46 +00001240 self.cmd = [_autoserv_path, '-b', '-m', host.hostname,
mblighd64e5702008-04-04 21:39:28 +00001241 '-r', self.temp_results_dir, '/dev/null']
mbligh36768f02008-02-22 18:28:33 +00001242 self.host = host
mblighd64e5702008-04-04 21:39:28 +00001243 super(RebootTask, self).__init__(self.cmd,
mbligh16c722d2008-03-05 00:58:44 +00001244 failure_tasks=[RepairTask(host)])
1245
mblighd5c95802008-03-05 00:33:46 +00001246
1247 def prolog(self):
1248 print "starting reboot task for host: %s" % self.host.hostname
1249 self.host.set_status("Rebooting")
1250
mblighd5c95802008-03-05 00:33:46 +00001251
1252class AbortTask(AgentTask):
1253 def __init__(self, queue_entry, agents_to_abort):
1254 self.queue_entry = queue_entry
1255 self.agents_to_abort = agents_to_abort
1256 for agent in agents_to_abort:
1257 agent.dispatcher.remove_agent(agent)
mblighd64e5702008-04-04 21:39:28 +00001258 super(AbortTask, self).__init__('')
mbligh36768f02008-02-22 18:28:33 +00001259
1260
mblighd5c95802008-03-05 00:33:46 +00001261 def prolog(self):
1262 print "starting abort on host %s, job %s" % (
1263 self.queue_entry.host_id, self.queue_entry.job_id)
1264 self.queue_entry.set_status('Aborting')
1265
mbligh36768f02008-02-22 18:28:33 +00001266
mblighd5c95802008-03-05 00:33:46 +00001267 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001268 super(AbortTask, self).epilog()
mblighd5c95802008-03-05 00:33:46 +00001269 self.queue_entry.set_status('Aborted')
1270 self.success = True
mbligh36768f02008-02-22 18:28:33 +00001271
mblighd64e5702008-04-04 21:39:28 +00001272
mbligh36768f02008-02-22 18:28:33 +00001273 def run(self):
mblighd5c95802008-03-05 00:33:46 +00001274 for agent in self.agents_to_abort:
1275 if (agent.active_task):
1276 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001277
1278
1279class DBObject(object):
showard04c82c52008-05-29 19:38:12 +00001280 def __init__(self, id=None, row=None, new_record=False):
1281 assert (bool(id) != bool(row))
mbligh36768f02008-02-22 18:28:33 +00001282
mblighe2586682008-02-29 22:45:46 +00001283 self.__table = self._get_table()
showard04c82c52008-05-29 19:38:12 +00001284 fields = self._fields()
mbligh36768f02008-02-22 18:28:33 +00001285
1286 self.__new_record = new_record
1287
1288 if row is None:
1289 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
mbligh6f8bab42008-02-29 22:45:14 +00001290 rows = _db.execute(sql, (id,))
1291 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001292 raise "row not found (table=%s, id=%s)" % \
1293 (self.__table, id)
mbligh6f8bab42008-02-29 22:45:14 +00001294 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001295
showard04c82c52008-05-29 19:38:12 +00001296 assert len(row) == self.num_cols(), (
mblighe2586682008-02-29 22:45:46 +00001297 "table = %s, row = %s/%d, fields = %s/%d" % (
showard04c82c52008-05-29 19:38:12 +00001298 self.__table, row, len(row), fields, self.num_cols()))
mbligh36768f02008-02-22 18:28:33 +00001299
1300 self.__valid_fields = {}
1301 for i,value in enumerate(row):
1302 self.__dict__[fields[i]] = value
1303 self.__valid_fields[fields[i]] = True
1304
1305 del self.__valid_fields['id']
1306
mblighe2586682008-02-29 22:45:46 +00001307
1308 @classmethod
1309 def _get_table(cls):
1310 raise NotImplementedError('Subclasses must override this')
1311
1312
showard04c82c52008-05-29 19:38:12 +00001313 @classmethod
1314 def _fields(cls):
1315 raise NotImplementedError('Subclasses must override this')
1316
1317
1318 @classmethod
1319 def num_cols(cls):
1320 return len(cls._fields())
1321
1322
mbligh36768f02008-02-22 18:28:33 +00001323 def count(self, where, table = None):
1324 if not table:
1325 table = self.__table
mbligh4314a712008-02-29 22:44:30 +00001326
mbligh6f8bab42008-02-29 22:45:14 +00001327 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001328 SELECT count(*) FROM %s
1329 WHERE %s
1330 """ % (table, where))
mbligh36768f02008-02-22 18:28:33 +00001331
mbligh6f8bab42008-02-29 22:45:14 +00001332 assert len(rows) == 1
1333
1334 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001335
1336
mbligh36768f02008-02-22 18:28:33 +00001337 def update_field(self, field, value):
1338 assert self.__valid_fields[field]
1339
1340 if self.__dict__[field] == value:
1341 return
1342
1343 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
1344 (self.__table, field)
mbligh6f8bab42008-02-29 22:45:14 +00001345 _db.execute(query, (value, self.id))
mbligh36768f02008-02-22 18:28:33 +00001346
1347 self.__dict__[field] = value
1348
1349
1350 def save(self):
1351 if self.__new_record:
showard04c82c52008-05-29 19:38:12 +00001352 keys = self._fields()[1:] # avoid id
mbligh36768f02008-02-22 18:28:33 +00001353 columns = ','.join([str(key) for key in keys])
1354 values = ['"%s"' % self.__dict__[key] for key in keys]
1355 values = ','.join(values)
1356 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1357 (self.__table, columns, values)
mbligh6f8bab42008-02-29 22:45:14 +00001358 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001359
1360
mblighe2586682008-02-29 22:45:46 +00001361 def delete(self):
1362 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1363 _db.execute(query, (self.id,))
1364
1365
1366 @classmethod
mbligh62ba2ed2008-04-30 17:09:25 +00001367 def fetch(cls, where, params=()):
mblighe2586682008-02-29 22:45:46 +00001368 rows = _db.execute(
mbligh62ba2ed2008-04-30 17:09:25 +00001369 'SELECT * FROM %s WHERE %s' % (cls._get_table(), where),
1370 params)
mblighe2586682008-02-29 22:45:46 +00001371 for row in rows:
1372 yield cls(row=row)
1373
mbligh36768f02008-02-22 18:28:33 +00001374
1375class IneligibleHostQueue(DBObject):
1376 def __init__(self, id=None, row=None, new_record=None):
showard04c82c52008-05-29 19:38:12 +00001377 super(IneligibleHostQueue, self).__init__(id=id, row=row,
1378 new_record=new_record)
mblighe2586682008-02-29 22:45:46 +00001379
1380
1381 @classmethod
1382 def _get_table(cls):
1383 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001384
1385
showard04c82c52008-05-29 19:38:12 +00001386 @classmethod
1387 def _fields(cls):
1388 return ['id', 'job_id', 'host_id']
1389
1390
mbligh36768f02008-02-22 18:28:33 +00001391class Host(DBObject):
1392 def __init__(self, id=None, row=None):
showard04c82c52008-05-29 19:38:12 +00001393 super(Host, self).__init__(id=id, row=row)
mblighe2586682008-02-29 22:45:46 +00001394
1395
1396 @classmethod
1397 def _get_table(cls):
1398 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001399
1400
showard04c82c52008-05-29 19:38:12 +00001401 @classmethod
1402 def _fields(cls):
1403 return ['id', 'hostname', 'locked', 'synch_id','status',
1404 'invalid']
1405
1406
mbligh36768f02008-02-22 18:28:33 +00001407 def current_task(self):
mbligh6f8bab42008-02-29 22:45:14 +00001408 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001409 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1410 """, (self.id,))
1411
mbligh6f8bab42008-02-29 22:45:14 +00001412 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001413 return None
1414 else:
mbligh6f8bab42008-02-29 22:45:14 +00001415 assert len(rows) == 1
1416 results = rows[0];
mbligh36768f02008-02-22 18:28:33 +00001417# print "current = %s" % results
1418 return HostQueueEntry(row=results)
1419
1420
mbligh36768f02008-02-22 18:28:33 +00001421 def yield_work(self):
1422 print "%s yielding work" % self.hostname
1423 if self.current_task():
1424 self.current_task().requeue()
1425
1426 def set_status(self,status):
mblighbb421852008-03-11 22:36:16 +00001427 print '%s -> %s' % (self.hostname, status)
mbligh36768f02008-02-22 18:28:33 +00001428 self.update_field('status',status)
1429
1430
1431class HostQueueEntry(DBObject):
1432 def __init__(self, id=None, row=None):
1433 assert id or row
showard04c82c52008-05-29 19:38:12 +00001434 super(HostQueueEntry, self).__init__(id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001435 self.job = Job(self.job_id)
1436
1437 if self.host_id:
1438 self.host = Host(self.host_id)
1439 else:
1440 self.host = None
1441
1442 self.queue_log_path = os.path.join(self.job.results_dir(),
1443 'queue.log.' + str(self.id))
1444
1445
mblighe2586682008-02-29 22:45:46 +00001446 @classmethod
1447 def _get_table(cls):
1448 return 'host_queue_entries'
1449
1450
showard04c82c52008-05-29 19:38:12 +00001451 @classmethod
1452 def _fields(cls):
1453 return ['id', 'job_id', 'host_id', 'priority', 'status',
1454 'meta_host', 'active', 'complete']
1455
1456
mbligh36768f02008-02-22 18:28:33 +00001457 def set_host(self, host):
1458 if host:
1459 self.queue_log_record('Assigning host ' + host.hostname)
1460 self.update_field('host_id', host.id)
1461 self.update_field('active', True)
mblighe2586682008-02-29 22:45:46 +00001462 self.block_host(host.id)
mbligh36768f02008-02-22 18:28:33 +00001463 else:
1464 self.queue_log_record('Releasing host')
mblighe2586682008-02-29 22:45:46 +00001465 self.unblock_host(self.host.id)
mbligh36768f02008-02-22 18:28:33 +00001466 self.update_field('host_id', None)
1467
1468 self.host = host
1469
1470
1471 def get_host(self):
mblighe2586682008-02-29 22:45:46 +00001472 return self.host
mbligh36768f02008-02-22 18:28:33 +00001473
1474
1475 def queue_log_record(self, log_line):
mblighe2586682008-02-29 22:45:46 +00001476 now = str(datetime.datetime.now())
mbligh36768f02008-02-22 18:28:33 +00001477 queue_log = open(self.queue_log_path, 'a', 0)
mblighe2586682008-02-29 22:45:46 +00001478 queue_log.write(now + ' ' + log_line + '\n')
mbligh36768f02008-02-22 18:28:33 +00001479 queue_log.close()
1480
1481
mblighe2586682008-02-29 22:45:46 +00001482 def block_host(self, host_id):
1483 print "creating block %s/%s" % (self.job.id, host_id)
1484 row = [0, self.job.id, host_id]
1485 block = IneligibleHostQueue(row=row, new_record=True)
1486 block.save()
1487
1488
1489 def unblock_host(self, host_id):
1490 print "removing block %s/%s" % (self.job.id, host_id)
showarda0939722008-05-06 21:18:13 +00001491 blocks = IneligibleHostQueue.fetch(
1492 'job_id=%d and host_id=%d' % (self.job.id, host_id))
1493 for block in blocks:
1494 block.delete()
mblighe2586682008-02-29 22:45:46 +00001495
1496
mbligh36768f02008-02-22 18:28:33 +00001497 def results_dir(self):
mblighe2586682008-02-29 22:45:46 +00001498 if self.job.is_synchronous() or self.job.num_machines() == 1:
1499 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001500 else:
1501 assert self.host
mblighe2586682008-02-29 22:45:46 +00001502 return os.path.join(self.job.job_dir,
1503 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +00001504
mblighe2586682008-02-29 22:45:46 +00001505
1506 def verify_results_dir(self):
1507 if self.job.is_synchronous() or self.job.num_machines() > 1:
1508 assert self.host
1509 return os.path.join(self.job.job_dir,
1510 self.host.hostname)
1511 else:
1512 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001513
1514
1515 def set_status(self, status):
1516 self.update_field('status', status)
1517 if self.host:
1518 hostname = self.host.hostname
1519 else:
1520 hostname = 'no host'
1521 print "%s/%d status -> %s" % (hostname, self.id, self.status)
1522 if status in ['Queued']:
1523 self.update_field('complete', False)
1524 self.update_field('active', False)
1525
mblighd5c95802008-03-05 00:33:46 +00001526 if status in ['Pending', 'Running', 'Verifying', 'Starting',
1527 'Abort', 'Aborting']:
mbligh36768f02008-02-22 18:28:33 +00001528 self.update_field('complete', False)
1529 self.update_field('active', True)
1530
mblighd5c95802008-03-05 00:33:46 +00001531 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
mbligh36768f02008-02-22 18:28:33 +00001532 self.update_field('complete', True)
1533 self.update_field('active', False)
1534
1535
1536 def run(self,assigned_host=None):
1537 if self.meta_host:
1538 assert assigned_host
mblighe2586682008-02-29 22:45:46 +00001539 # ensure results dir exists for the queue log
mbligh36768f02008-02-22 18:28:33 +00001540 self.job.create_results_dir()
1541 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001542
mbligh36768f02008-02-22 18:28:33 +00001543 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1544 self.meta_host, self.host.hostname, self.status)
1545
1546 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001547
mbligh36768f02008-02-22 18:28:33 +00001548 def requeue(self):
1549 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001550
mbligh36768f02008-02-22 18:28:33 +00001551 if self.meta_host:
1552 self.set_host(None)
1553
1554
mblighe2586682008-02-29 22:45:46 +00001555 def handle_host_failure(self):
1556 """\
1557 Called when this queue entry's host has failed verification and
1558 repair.
1559 """
mblighdffd6372008-02-29 22:47:33 +00001560 assert not self.meta_host
1561 self.set_status('Failed')
1562 if self.job.is_synchronous():
1563 self.job.stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00001564
1565
mbligh90a549d2008-03-25 23:52:34 +00001566 def clear_results_dir(self, results_dir=None, dont_delete_files=False):
mblighe2586682008-02-29 22:45:46 +00001567 results_dir = results_dir or self.results_dir()
1568 if not os.path.exists(results_dir):
1569 return
mbligh90a549d2008-03-25 23:52:34 +00001570 if dont_delete_files:
1571 temp_dir = tempfile.mkdtemp(suffix='.clear_results')
1572 print 'Moving results from %s to %s' % (results_dir,
1573 temp_dir)
mblighe2586682008-02-29 22:45:46 +00001574 for filename in os.listdir(results_dir):
mblighe2586682008-02-29 22:45:46 +00001575 path = os.path.join(results_dir, filename)
mbligh90a549d2008-03-25 23:52:34 +00001576 if dont_delete_files:
1577 shutil.move(path,
1578 os.path.join(temp_dir, filename))
1579 else:
1580 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00001581
1582
1583class Job(DBObject):
1584 def __init__(self, id=None, row=None):
1585 assert id or row
showard04c82c52008-05-29 19:38:12 +00001586 super(Job, self).__init__(id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001587
mblighe2586682008-02-29 22:45:46 +00001588 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
1589 self.owner))
1590
1591
1592 @classmethod
1593 def _get_table(cls):
1594 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001595
1596
showard04c82c52008-05-29 19:38:12 +00001597 @classmethod
1598 def _fields(cls):
1599 return ['id', 'owner', 'name', 'priority', 'control_file',
1600 'control_type', 'created_on', 'synch_type',
1601 'synch_count', 'synchronizing']
1602
1603
mbligh36768f02008-02-22 18:28:33 +00001604 def is_server_job(self):
1605 return self.control_type != 2
1606
1607
1608 def get_host_queue_entries(self):
mbligh6f8bab42008-02-29 22:45:14 +00001609 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001610 SELECT * FROM host_queue_entries
1611 WHERE job_id= %s
1612 """, (self.id,))
mbligh6f8bab42008-02-29 22:45:14 +00001613 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001614
1615 assert len(entries)>0
1616
1617 return entries
1618
1619
1620 def set_status(self, status, update_queues=False):
1621 self.update_field('status',status)
1622
1623 if update_queues:
1624 for queue_entry in self.get_host_queue_entries():
1625 queue_entry.set_status(status)
1626
1627
1628 def is_synchronous(self):
1629 return self.synch_type == 2
1630
1631
1632 def is_ready(self):
1633 if not self.is_synchronous():
1634 return True
1635 sql = "job_id=%s AND status='Pending'" % self.id
1636 count = self.count(sql, table='host_queue_entries')
1637 return (count == self.synch_count)
1638
1639
1640 def ready_to_synchronize(self):
1641 # heuristic
1642 queue_entries = self.get_host_queue_entries()
1643 count = 0
1644 for queue_entry in queue_entries:
1645 if queue_entry.status == 'Pending':
1646 count += 1
1647
1648 return (count/self.synch_count >= 0.5)
1649
1650
1651 def start_synchronizing(self):
1652 self.update_field('synchronizing', True)
1653
1654
1655 def results_dir(self):
1656 return self.job_dir
1657
1658 def num_machines(self, clause = None):
1659 sql = "job_id=%s" % self.id
1660 if clause:
1661 sql += " AND (%s)" % clause
1662 return self.count(sql, table='host_queue_entries')
1663
1664
1665 def num_queued(self):
1666 return self.num_machines('not complete')
1667
1668
1669 def num_active(self):
1670 return self.num_machines('active')
1671
1672
1673 def num_complete(self):
1674 return self.num_machines('complete')
1675
1676
1677 def is_finished(self):
1678 left = self.num_queued()
1679 print "%s: %s machines left" % (self.name, left)
1680 return left==0
1681
1682 def stop_synchronizing(self):
1683 self.update_field('synchronizing', False)
1684 self.set_status('Queued', update_queues = False)
1685
1686
mblighe2586682008-02-29 22:45:46 +00001687 def stop_all_entries(self):
1688 for child_entry in self.get_host_queue_entries():
1689 if not child_entry.complete:
1690 child_entry.set_status('Stopped')
1691
1692
1693 def write_to_machines_file(self, queue_entry):
1694 hostname = queue_entry.get_host().hostname
1695 print "writing %s to job %s machines file" % (hostname, self.id)
1696 file_path = os.path.join(self.job_dir, '.machines')
1697 mf = open(file_path, 'a')
1698 mf.write("%s\n" % queue_entry.get_host().hostname)
1699 mf.close()
mbligh36768f02008-02-22 18:28:33 +00001700
1701
1702 def create_results_dir(self, queue_entry=None):
1703 print "create: active: %s complete %s" % (self.num_active(),
1704 self.num_complete())
1705
1706 if not os.path.exists(self.job_dir):
1707 os.makedirs(self.job_dir)
1708
1709 if queue_entry:
1710 return queue_entry.results_dir()
1711 return self.job_dir
1712
1713
1714 def run(self, queue_entry):
1715 results_dir = self.create_results_dir(queue_entry)
1716
1717 if self.is_synchronous():
1718 if not self.is_ready():
mblighd5c95802008-03-05 00:33:46 +00001719 return Agent([VerifySynchronousTask(
1720 queue_entry = queue_entry)],
1721 [queue_entry.id])
mbligh36768f02008-02-22 18:28:33 +00001722
1723 queue_entry.set_status('Starting')
1724
1725 ctrl = open(os.tmpnam(), 'w')
1726 if self.control_file:
1727 ctrl.write(self.control_file)
1728 else:
1729 ctrl.write("")
1730 ctrl.flush()
1731
1732 if self.is_synchronous():
mbligh36768f02008-02-22 18:28:33 +00001733 queue_entries = self.get_host_queue_entries()
1734 else:
1735 assert queue_entry
mbligh36768f02008-02-22 18:28:33 +00001736 queue_entries = [queue_entry]
mblighe2586682008-02-29 22:45:46 +00001737 hostnames = ','.join([entry.get_host().hostname
1738 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00001739
mbligh6437ff52008-04-17 15:24:38 +00001740 # determine the job tag
1741 if self.is_synchronous() or self.num_machines() == 1:
1742 job_name = "%s-%s" % (self.id, self.owner)
1743 else:
1744 job_name = "%s-%s/%s" % (self.id, self.owner,
1745 hostnames)
1746
1747 params = [_autoserv_path, '-P', job_name, '-p', '-n',
mblighbb421852008-03-11 22:36:16 +00001748 '-r', os.path.abspath(results_dir),
1749 '-b', '-u', self.owner, '-l', self.name,
1750 '-m', hostnames, ctrl.name]
mbligh36768f02008-02-22 18:28:33 +00001751
1752 if not self.is_server_job():
1753 params.append('-c')
1754
1755 tasks = []
1756 if not self.is_synchronous():
1757 tasks.append(VerifyTask(queue_entry))
mblighe2586682008-02-29 22:45:46 +00001758
1759 tasks.append(QueueTask(job = self,
1760 queue_entries = queue_entries,
1761 cmd = params))
mbligh36768f02008-02-22 18:28:33 +00001762
mblighd5c95802008-03-05 00:33:46 +00001763 ids = []
1764 for entry in queue_entries:
1765 ids.append(entry.id)
1766
1767 agent = Agent(tasks, ids)
mbligh36768f02008-02-22 18:28:33 +00001768
1769 return agent
1770
1771
1772if __name__ == '__main__':
1773 main()