blob: 2eec32f5ee4bca49d56de1c594185d393e1591d7 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
6__author__ = "Paul Turner <pjt@google.com>"
7
8import os, sys, tempfile, shutil, MySQLdb, time, traceback, subprocess, Queue
mblighcadb3532008-04-15 17:46:26 +00009import optparse, signal, smtplib, socket, datetime, stat, pwd, errno
mblighb090f142008-02-27 21:33:46 +000010from common import global_config
11
mbligh36768f02008-02-22 18:28:33 +000012RESULTS_DIR = '.'
13AUTOSERV_NICE_LEVEL = 10
14
15AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
16
17if os.environ.has_key('AUTOTEST_DIR'):
18 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
19AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
20AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
21
22if AUTOTEST_SERVER_DIR not in sys.path:
23 sys.path.insert(0, AUTOTEST_SERVER_DIR)
24
mblighbb421852008-03-11 22:36:16 +000025AUTOSERV_PID_FILE = '.autoserv_execute'
mbligh90a549d2008-03-25 23:52:34 +000026# how long to wait for autoserv to write a pidfile
27PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000028
mbligh6f8bab42008-02-29 22:45:14 +000029_db = None
mbligh36768f02008-02-22 18:28:33 +000030_shutdown = False
31_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000032_autoserv_path = 'autoserv'
33_testing_mode = False
showardec113162008-05-08 00:52:49 +000034_global_config_section = 'SCHEDULER'
mbligh36768f02008-02-22 18:28:33 +000035
36
37def main():
38 usage = 'usage: %prog [options] results_dir'
39
40 parser = optparse.OptionParser(usage)
mblighbb421852008-03-11 22:36:16 +000041 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
mbligh36768f02008-02-22 18:28:33 +000042 action='store_true')
43 parser.add_option('--logfile', help='Set a log file that all stdout ' +
44 'should be redirected to. Stderr will go to this ' +
45 'file + ".err"')
mbligh4314a712008-02-29 22:44:30 +000046 parser.add_option('--test', help='Indicate that scheduler is under ' +
47 'test and should use dummy autoserv and no parsing',
48 action='store_true')
mbligh36768f02008-02-22 18:28:33 +000049 (options, args) = parser.parse_args()
50 if len(args) != 1:
51 parser.print_usage()
52 return
53
54 global RESULTS_DIR
55 RESULTS_DIR = args[0]
56
mblighe44a46d2008-04-30 17:47:34 +000057 # read in notify_email from global_config
58 c = global_config.global_config
mbligh36768f02008-02-22 18:28:33 +000059 global _notify_email
showardec113162008-05-08 00:52:49 +000060 val = c.get_config_value(_global_config_section, "notify_email")
mblighe44a46d2008-04-30 17:47:34 +000061 if val != "":
62 _notify_email = val
mbligh4314a712008-02-29 22:44:30 +000063
64 if options.test:
65 global _autoserv_path
66 _autoserv_path = 'autoserv_dummy'
67 global _testing_mode
68 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000069
70 init(options.logfile)
mblighbb421852008-03-11 22:36:16 +000071 dispatcher = Dispatcher()
72 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
mbligh36768f02008-02-22 18:28:33 +000073
74 try:
75 while not _shutdown:
76 dispatcher.tick()
77 time.sleep(20)
mbligh36768f02008-02-22 18:28:33 +000078 except:
79 log_stacktrace("Uncaught exception; terminating monitor_db")
80
mbligh6f8bab42008-02-29 22:45:14 +000081 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +000082
83
84def handle_sigint(signum, frame):
85 global _shutdown
86 _shutdown = True
87 print "Shutdown request received."
88
89
90def init(logfile):
91 if logfile:
92 enable_logging(logfile)
93 print "%s> dispatcher starting" % time.strftime("%X %x")
94 print "My PID is %d" % os.getpid()
95
96 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
mbligh6f8bab42008-02-29 22:45:14 +000097 global _db
98 _db = DatabaseConn()
mbligh36768f02008-02-22 18:28:33 +000099
100 print "Setting signal handler"
101 signal.signal(signal.SIGINT, handle_sigint)
102
103 print "Connected! Running..."
104
105
106def enable_logging(logfile):
107 out_file = logfile
108 err_file = "%s.err" % logfile
109 print "Enabling logging to %s (%s)" % (out_file, err_file)
110 out_fd = open(out_file, "a", buffering=0)
111 err_fd = open(err_file, "a", buffering=0)
112
113 os.dup2(out_fd.fileno(), sys.stdout.fileno())
114 os.dup2(err_fd.fileno(), sys.stderr.fileno())
115
116 sys.stdout = out_fd
117 sys.stderr = err_fd
118
119
120def idle_hosts():
mbligh6f8bab42008-02-29 22:45:14 +0000121 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +0000122 SELECT * FROM hosts h WHERE
123 id NOT IN (SELECT host_id FROM host_queue_entries WHERE active) AND (
124 (id IN (SELECT host_id FROM host_queue_entries WHERE not complete AND not active))
125 OR
126 (id IN (SELECT DISTINCT hl.host_id FROM host_queue_entries hqe
127 INNER JOIN hosts_labels hl ON hqe.meta_host=hl.label_id WHERE not hqe.complete AND not hqe.active))
128 )
mbligh5244cbb2008-04-24 20:39:52 +0000129 AND locked=false AND invalid=false
130 AND (h.status IS null OR h.status='Ready') """)
mbligh6f8bab42008-02-29 22:45:14 +0000131 hosts = [Host(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +0000132 return hosts
133
mblighd5c95802008-03-05 00:33:46 +0000134def queue_entries_to_abort():
135 rows = _db.execute("""
136 SELECT * FROM host_queue_entries WHERE status='Abort';
137 """)
138 qe = [HostQueueEntry(row=i) for i in rows]
139 return qe
mbligh36768f02008-02-22 18:28:33 +0000140
mblighe2586682008-02-29 22:45:46 +0000141def remove_file_or_dir(path):
142 if stat.S_ISDIR(os.stat(path).st_mode):
143 # directory
144 shutil.rmtree(path)
145 else:
146 # file
147 os.remove(path)
148
149
mbligh6f8bab42008-02-29 22:45:14 +0000150class DatabaseConn:
151 def __init__(self):
152 self.reconnect_wait = 20
153 self.conn = None
154 self.cur = None
mbligh36768f02008-02-22 18:28:33 +0000155
mbligh4eb2df22008-03-13 15:39:29 +0000156 import MySQLdb.converters
157 self.convert_dict = MySQLdb.converters.conversions
158 self.convert_dict.setdefault(bool, self.convert_boolean)
159
mbligh6f8bab42008-02-29 22:45:14 +0000160 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000161
162
mbligh4eb2df22008-03-13 15:39:29 +0000163 @staticmethod
164 def convert_boolean(boolean, conversion_dict):
165 'Convert booleans to integer strings'
166 return str(int(boolean))
167
168
mbligh6f8bab42008-02-29 22:45:14 +0000169 def connect(self):
170 self.disconnect()
171
172 # get global config and parse for info
173 c = global_config.global_config
174 dbase = "AUTOTEST_WEB"
mbligh104e9ce2008-03-11 22:01:44 +0000175 DB_HOST = c.get_config_value(dbase, "host")
176 DB_SCHEMA = c.get_config_value(dbase, "database")
mbligh6f8bab42008-02-29 22:45:14 +0000177
178 global _testing_mode
179 if _testing_mode:
180 DB_SCHEMA = 'stresstest_autotest_web'
181
mbligh104e9ce2008-03-11 22:01:44 +0000182 DB_USER = c.get_config_value(dbase, "user")
183 DB_PASS = c.get_config_value(dbase, "password")
mbligh6f8bab42008-02-29 22:45:14 +0000184
185 while not self.conn:
186 try:
mbligh4eb2df22008-03-13 15:39:29 +0000187 self.conn = MySQLdb.connect(
188 host=DB_HOST, user=DB_USER, passwd=DB_PASS,
189 db=DB_SCHEMA, conv=self.convert_dict)
mbligh6f8bab42008-02-29 22:45:14 +0000190
191 self.conn.autocommit(True)
192 self.cur = self.conn.cursor()
193 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000194 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000195 print "Can't connect to MYSQL; reconnecting"
196 time.sleep(self.reconnect_wait)
197 self.disconnect()
198
199
200 def disconnect(self):
201 if self.conn:
202 self.conn.close()
203 self.conn = None
204 self.cur = None
205
206
207 def execute(self, *args, **dargs):
208 while (True):
209 try:
210 self.cur.execute(*args, **dargs)
211 return self.cur.fetchall()
212 except MySQLdb.OperationalError:
mblighbb421852008-03-11 22:36:16 +0000213 traceback.print_exc()
mbligh6f8bab42008-02-29 22:45:14 +0000214 print "MYSQL connection died; reconnecting"
215 time.sleep(self.reconnect_wait)
216 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000217
218
mblighdbdac6c2008-03-05 15:49:58 +0000219def generate_parse_command(results_dir, flags=""):
220 parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
221 output = os.path.abspath(os.path.join(results_dir, '.parse.log'))
222 cmd = "%s %s -r -o %s > %s 2>&1 &"
223 return cmd % (parse, flags, results_dir, output)
224
225
mbligh36768f02008-02-22 18:28:33 +0000226def parse_results(results_dir, flags=""):
mbligh4314a712008-02-29 22:44:30 +0000227 if _testing_mode:
228 return
mblighdbdac6c2008-03-05 15:49:58 +0000229 os.system(generate_parse_command(results_dir, flags))
mbligh36768f02008-02-22 18:28:33 +0000230
231
mblighbb421852008-03-11 22:36:16 +0000232
233
mbligh36768f02008-02-22 18:28:33 +0000234def log_stacktrace(reason):
235 (type, value, tb) = sys.exc_info()
236 str = "EXCEPTION: %s\n" % reason
mbligh36768f02008-02-22 18:28:33 +0000237 str += ''.join(traceback.format_exception(type, value, tb))
238
239 sys.stderr.write("\n%s\n" % str)
showard7cf9a9b2008-05-15 21:15:52 +0000240 email_manager.enqueue_notify_email("monitor_db exception", str)
mbligh36768f02008-02-22 18:28:33 +0000241
mblighbb421852008-03-11 22:36:16 +0000242
243def get_proc_poll_fn(pid):
244 proc_path = os.path.join('/proc', str(pid))
245 def poll_fn():
246 if os.path.exists(proc_path):
247 return None
248 return 0 # we can't get a real exit code
249 return poll_fn
250
251
252def kill_autoserv(pid, poll_fn=None):
253 print 'killing', pid
254 if poll_fn is None:
255 poll_fn = get_proc_poll_fn(pid)
256 if poll_fn() == None:
257 os.kill(pid, signal.SIGCONT)
258 os.kill(pid, signal.SIGTERM)
mbligh36768f02008-02-22 18:28:33 +0000259
260
showard7cf9a9b2008-05-15 21:15:52 +0000261class EmailNotificationManager(object):
262 def __init__(self):
263 self._emails = []
264 # see os.getlogin() online docs
265 self._sender = pwd.getpwuid(os.getuid())[0]
266
267
268 def enqueue_notify_email(self, subject, message):
269 if not _notify_email:
270 return
271
272 body = 'Subject: ' + subject + '\n'
273 body += "%s / %s / %s\n%s" % (socket.gethostname(),
274 os.getpid(),
275 time.strftime("%X %x"), message)
276 self._emails.append(body)
277
278
279 def send_queued_emails(self):
280 if not self._emails:
281 return
282 subject = 'Scheduler notifications from ' + socket.gethostname()
283 separator = '\n' + '-' * 40 + '\n'
284 body = separator.join(self._emails)
285 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
286 self._sender, _notify_email, subject, body)
287
288 mailer = smtplib.SMTP('localhost')
289 mailer.sendmail(self._sender, _notify_email, msg)
290 mailer.quit()
291 self._emails = []
292
293email_manager = EmailNotificationManager()
294
295
mbligh36768f02008-02-22 18:28:33 +0000296class Dispatcher:
mbligh90a549d2008-03-25 23:52:34 +0000297 autoserv_procs_cache = None
showardec113162008-05-08 00:52:49 +0000298 max_running_agents = global_config.global_config.get_config_value(
299 _global_config_section, 'max_running_jobs', type=int)
300 max_jobs_started_per_cycle = (
301 global_config.global_config.get_config_value(
302 _global_config_section, 'max_jobs_started_per_cycle', type=int))
mbligh90a549d2008-03-25 23:52:34 +0000303
mblighbb421852008-03-11 22:36:16 +0000304 def __init__(self):
mbligh36768f02008-02-22 18:28:33 +0000305 self._agents = []
mbligh36768f02008-02-22 18:28:33 +0000306
mbligh36768f02008-02-22 18:28:33 +0000307
mblighbb421852008-03-11 22:36:16 +0000308 def do_initial_recovery(self, recover_hosts=True):
309 # always recover processes
310 self._recover_processes()
311
312 if recover_hosts:
313 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000314
315
316 def tick(self):
mbligh90a549d2008-03-25 23:52:34 +0000317 Dispatcher.autoserv_procs_cache = None
mblighbb421852008-03-11 22:36:16 +0000318 self._find_aborting()
319 self._find_more_work()
mbligh36768f02008-02-22 18:28:33 +0000320 self._handle_agents()
mbligh62ba2ed2008-04-30 17:09:25 +0000321 self._clear_inactive_blocks()
showard7cf9a9b2008-05-15 21:15:52 +0000322 email_manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000323
324
325 def add_agent(self, agent):
326 self._agents.append(agent)
327 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000328
329 # Find agent corresponding to the specified queue_entry
330 def get_agents(self, queue_entry):
331 res_agents = []
332 for agent in self._agents:
333 if queue_entry.id in agent.queue_entry_ids:
334 res_agents.append(agent)
335 return res_agents
336
337
338 def remove_agent(self, agent):
339 self._agents.remove(agent)
mbligh36768f02008-02-22 18:28:33 +0000340
341
showardec113162008-05-08 00:52:49 +0000342 def num_started_agents(self):
343 return len([agent for agent in self._agents
344 if agent.is_started()])
345
346
mbligh90a549d2008-03-25 23:52:34 +0000347 @classmethod
348 def find_autoservs(cls, orphans_only=False):
mblighbb421852008-03-11 22:36:16 +0000349 """\
350 Returns a dict mapping pids to command lines for root autoserv
mbligh90a549d2008-03-25 23:52:34 +0000351 processes. If orphans_only=True, return only processes that
352 have been orphaned (i.e. parent pid = 1).
mblighbb421852008-03-11 22:36:16 +0000353 """
mbligh90a549d2008-03-25 23:52:34 +0000354 if cls.autoserv_procs_cache is not None:
355 return cls.autoserv_procs_cache
356
mblighbb421852008-03-11 22:36:16 +0000357 proc = subprocess.Popen(
mbligh90a549d2008-03-25 23:52:34 +0000358 ['/bin/ps', 'x', '-o', 'pid,pgid,ppid,comm,args'],
mblighbb421852008-03-11 22:36:16 +0000359 stdout=subprocess.PIPE)
360 # split each line into the four columns output by ps
mbligh90a549d2008-03-25 23:52:34 +0000361 procs = [line.split(None, 4) for line in
mblighbb421852008-03-11 22:36:16 +0000362 proc.communicate()[0].splitlines()]
mbligh90a549d2008-03-25 23:52:34 +0000363 autoserv_procs = {}
364 for proc in procs:
365 # check ppid == 1 for orphans
366 if orphans_only and proc[2] != 1:
367 continue
368 # only root autoserv processes have pgid == pid
369 if (proc[3] == 'autoserv' and # comm
370 proc[1] == proc[0]): # pgid == pid
371 # map pid to args
372 autoserv_procs[int(proc[0])] = proc[4]
373 cls.autoserv_procs_cache = autoserv_procs
374 return autoserv_procs
mblighbb421852008-03-11 22:36:16 +0000375
376
377 def recover_queue_entry(self, queue_entry, run_monitor):
378 job = queue_entry.job
379 if job.is_synchronous():
380 all_queue_entries = job.get_host_queue_entries()
381 else:
382 all_queue_entries = [queue_entry]
383 all_queue_entry_ids = [queue_entry.id for queue_entry
384 in all_queue_entries]
385 queue_task = RecoveryQueueTask(
386 job=queue_entry.job,
387 queue_entries=all_queue_entries,
388 run_monitor=run_monitor)
389 self.add_agent(Agent(tasks=[queue_task],
390 queue_entry_ids=all_queue_entry_ids))
391
392
393 def _recover_processes(self):
mbligh90a549d2008-03-25 23:52:34 +0000394 orphans = self.find_autoservs(orphans_only=True)
mblighbb421852008-03-11 22:36:16 +0000395
396 # first, recover running queue entries
397 rows = _db.execute("""SELECT * FROM host_queue_entries
398 WHERE status = 'Running'""")
399 queue_entries = [HostQueueEntry(row=i) for i in rows]
400 requeue_entries = []
401 recovered_entry_ids = set()
402 for queue_entry in queue_entries:
403 run_monitor = PidfileRunMonitor(
404 queue_entry.results_dir())
405 pid, exit_code = run_monitor.get_pidfile_info()
406 if pid is None:
407 # autoserv apparently never got run, so requeue
408 requeue_entries.append(queue_entry)
409 continue
410 if queue_entry.id in recovered_entry_ids:
411 # synchronous job we've already recovered
412 continue
413 print 'Recovering queue entry %d (pid %d)' % (
414 queue_entry.id, pid)
415 job = queue_entry.job
416 if job.is_synchronous():
417 for entry in job.get_host_queue_entries():
418 assert entry.active
419 recovered_entry_ids.add(entry.id)
420 self.recover_queue_entry(queue_entry,
421 run_monitor)
422 orphans.pop(pid, None)
423
424 # and requeue other active queue entries
425 rows = _db.execute("""SELECT * FROM host_queue_entries
426 WHERE active AND NOT complete
427 AND status != 'Running'
428 AND status != 'Pending'
429 AND status != 'Abort'
430 AND status != 'Aborting'""")
431 queue_entries = [HostQueueEntry(row=i) for i in rows]
432 for queue_entry in queue_entries + requeue_entries:
433 print 'Requeuing running QE %d' % queue_entry.id
mbligh90a549d2008-03-25 23:52:34 +0000434 queue_entry.clear_results_dir(dont_delete_files=True)
mblighbb421852008-03-11 22:36:16 +0000435 queue_entry.requeue()
436
437
438 # now kill any remaining autoserv processes
439 for pid in orphans.keys():
440 print 'Killing orphan %d (%s)' % (pid, orphans[pid])
441 kill_autoserv(pid)
442
443 # recover aborting tasks
mbligh90a549d2008-03-25 23:52:34 +0000444 rebooting_host_ids = set()
mblighd5c95802008-03-05 00:33:46 +0000445 rows = _db.execute("""SELECT * FROM host_queue_entries
446 WHERE status='Abort' or status='Aborting'""")
mblighbb421852008-03-11 22:36:16 +0000447 queue_entries = [HostQueueEntry(row=i) for i in rows]
448 for queue_entry in queue_entries:
mbligh90a549d2008-03-25 23:52:34 +0000449 print 'Recovering aborting QE %d' % queue_entry.id
mblighbb421852008-03-11 22:36:16 +0000450 queue_host = queue_entry.get_host()
451 reboot_task = RebootTask(queue_host)
452 verify_task = VerifyTask(host = queue_host)
453 self.add_agent(Agent(tasks=[reboot_task,
454 verify_task],
455 queue_entry_ids=[queue_entry.id]))
456 queue_entry.set_status('Aborted')
457 # Secure the host from being picked up
458 queue_host.set_status('Rebooting')
mbligh90a549d2008-03-25 23:52:34 +0000459 rebooting_host_ids.add(queue_host.id)
mblighd5c95802008-03-05 00:33:46 +0000460
mblighbb421852008-03-11 22:36:16 +0000461 # reverify hosts that were in the middle of verify, repair or
462 # reboot
mbligh90a549d2008-03-25 23:52:34 +0000463 self._reverify_hosts_where("""(status = 'Repairing' OR
464 status = 'Verifying' OR
465 status = 'Rebooting')""",
466 exclude_ids=rebooting_host_ids)
467
468 # finally, recover "Running" hosts with no active queue entries,
469 # although this should never happen
470 message = ('Recovering running host %s - this probably '
471 'indicates a scheduler bug')
472 self._reverify_hosts_where("""status = 'Running' AND
473 id NOT IN (SELECT host_id
474 FROM host_queue_entries
475 WHERE active)""",
476 print_message=message)
477
478
479 def _reverify_hosts_where(self, where,
480 print_message='Reverifying host %s',
481 exclude_ids=set()):
mbligh5244cbb2008-04-24 20:39:52 +0000482 rows = _db.execute('SELECT * FROM hosts WHERE locked = 0 AND '
483 'invalid = 0 AND ' + where)
mblighbb421852008-03-11 22:36:16 +0000484 hosts = [Host(row=i) for i in rows]
485 for host in hosts:
mbligh90a549d2008-03-25 23:52:34 +0000486 if host.id in exclude_ids:
487 continue
488 if print_message is not None:
489 print print_message % host.hostname
490 verify_task = VerifyTask(host = host)
491 self.add_agent(Agent(tasks = [verify_task]))
mblighbb421852008-03-11 22:36:16 +0000492
493
494 def _recover_hosts(self):
mbligh90a549d2008-03-25 23:52:34 +0000495 # recover "Repair Failed" hosts
496 message = 'Reverifying dead host %s'
497 self._reverify_hosts_where("status = 'Repair Failed'",
498 print_message=message)
mbligh36768f02008-02-22 18:28:33 +0000499
500
mbligh62ba2ed2008-04-30 17:09:25 +0000501 def _clear_inactive_blocks(self):
502 """
503 Clear out blocks for all completed jobs.
504 """
505 # this would be simpler using NOT IN (subquery), but MySQL
506 # treats all IN subqueries as dependent, so this optimizes much
507 # better
508 _db.execute("""
509 DELETE ihq FROM ineligible_host_queues ihq
510 LEFT JOIN (SELECT job_id FROM host_queue_entries
511 WHERE NOT complete) hqe
512 USING (job_id) WHERE hqe.job_id IS NULL""")
513
514
mbligh36768f02008-02-22 18:28:33 +0000515 def _find_more_work(self):
516 print "finding work"
517
mbligh36768f02008-02-22 18:28:33 +0000518 for host in idle_hosts():
519 tasks = host.next_queue_entries()
520 if tasks:
521 for next in tasks:
522 try:
523 agent = next.run(assigned_host=host)
524 if agent:
525 self.add_agent(agent)
mbligh36768f02008-02-22 18:28:33 +0000526 break
527 except:
528 next.set_status('Failed')
529
530# if next.host:
531# next.host.set_status('Ready')
532
533 log_stacktrace("task_id = %d" % next.id)
534
535
mblighd5c95802008-03-05 00:33:46 +0000536 def _find_aborting(self):
537 num_aborted = 0
538 # Find jobs that are aborting
539 for entry in queue_entries_to_abort():
540 agents_to_abort = self.get_agents(entry)
541 entry_host = entry.get_host()
542 reboot_task = RebootTask(entry_host)
543 verify_task = VerifyTask(host = entry_host)
544 tasks = [reboot_task, verify_task]
545 if agents_to_abort:
546 abort_task = AbortTask(entry, agents_to_abort)
547 tasks.insert(0, abort_task)
548 else:
549 entry.set_status('Aborted')
550 # just to make sure this host does not get
551 # taken away
552 entry_host.set_status('Rebooting')
553 self.add_agent(Agent(tasks=tasks,
554 queue_entry_ids = [entry.id]))
555 num_aborted += 1
556 if num_aborted >= 50:
557 break
558
559
mbligh36768f02008-02-22 18:28:33 +0000560 def _handle_agents(self):
561 still_running = []
showardec113162008-05-08 00:52:49 +0000562 num_started = self.num_started_agents()
563 start_new = (num_started < self.max_running_agents)
564 num_started_this_cycle = 0
mbligh36768f02008-02-22 18:28:33 +0000565 for agent in self._agents:
showardec113162008-05-08 00:52:49 +0000566 if not agent.is_started():
567 if not start_new:
568 still_running.append(agent)
569 continue
570 num_started += 1
571 num_started_this_cycle += 1
572 if (num_started >= self.max_running_agents or
573 num_started_this_cycle >=
574 self.max_jobs_started_per_cycle):
575 start_new = False
mbligh36768f02008-02-22 18:28:33 +0000576 agent.tick()
577 if not agent.is_done():
578 still_running.append(agent)
579 else:
580 print "agent finished"
581 self._agents = still_running
showardec113162008-05-08 00:52:49 +0000582 print num_started, 'running agents'
mbligh36768f02008-02-22 18:28:33 +0000583
584
585class RunMonitor(object):
586 def __init__(self, cmd, nice_level = None, log_file = None):
587 self.nice_level = nice_level
588 self.log_file = log_file
589 self.proc = self.run(cmd)
590
591 def run(self, cmd):
592 if self.nice_level:
593 nice_cmd = ['nice','-n', str(self.nice_level)]
594 nice_cmd.extend(cmd)
595 cmd = nice_cmd
596
597 out_file = None
598 if self.log_file:
599 try:
mblighbb421852008-03-11 22:36:16 +0000600 os.makedirs(os.path.dirname(self.log_file))
mblighcadb3532008-04-15 17:46:26 +0000601 except OSError, exc:
602 if exc.errno != errno.EEXIST:
603 log_stacktrace(
604 'Unexpected error creating logfile '
605 'directory for %s' % self.log_file)
606 try:
mbligh36768f02008-02-22 18:28:33 +0000607 out_file = open(self.log_file, 'a')
608 out_file.write("\n%s\n" % ('*'*80))
mblighcadb3532008-04-15 17:46:26 +0000609 out_file.write("%s> %s\n" %
610 (time.strftime("%X %x"), cmd))
mbligh36768f02008-02-22 18:28:33 +0000611 out_file.write("%s\n" % ('*'*80))
mblighcadb3532008-04-15 17:46:26 +0000612 except (OSError, IOError):
613 log_stacktrace('Error opening log file %s' %
614 self.log_file)
615
mbligh36768f02008-02-22 18:28:33 +0000616 if not out_file:
617 out_file = open('/dev/null', 'w')
mblighcadb3532008-04-15 17:46:26 +0000618
mbligh36768f02008-02-22 18:28:33 +0000619 in_devnull = open('/dev/null', 'r')
620 print "cmd = %s" % cmd
621 print "path = %s" % os.getcwd()
622
623 proc = subprocess.Popen(cmd, stdout=out_file,
624 stderr=subprocess.STDOUT, stdin=in_devnull)
625 out_file.close()
626 in_devnull.close()
627 return proc
628
629
mblighbb421852008-03-11 22:36:16 +0000630 def get_pid(self):
631 return self.proc.pid
632
633
mbligh36768f02008-02-22 18:28:33 +0000634 def kill(self):
mblighbb421852008-03-11 22:36:16 +0000635 kill_autoserv(self.get_pid(), self.exit_code)
636
mbligh36768f02008-02-22 18:28:33 +0000637
638 def exit_code(self):
639 return self.proc.poll()
640
641
mblighbb421852008-03-11 22:36:16 +0000642class PidfileException(Exception):
643 """\
644 Raised when there's some unexpected behavior with the pid file.
645 """
646
647
648class PidfileRunMonitor(RunMonitor):
649 def __init__(self, results_dir, cmd=None, nice_level=None,
650 log_file=None):
651 self.results_dir = os.path.abspath(results_dir)
652 self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
653 self.lost_process = False
mbligh90a549d2008-03-25 23:52:34 +0000654 self.start_time = time.time()
mblighbb421852008-03-11 22:36:16 +0000655 if cmd is None:
656 # we're reattaching to an existing pid, so don't call
657 # the superconstructor (we don't want to kick off a new
658 # process)
659 pass
660 else:
mblighd64e5702008-04-04 21:39:28 +0000661 super(PidfileRunMonitor, self).__init__(cmd,
662 nice_level, log_file)
mblighbb421852008-03-11 22:36:16 +0000663
664
665 def get_pid(self):
666 pid, exit_status = self.get_pidfile_info()
667 assert pid is not None
668 return pid
669
670
mbligh90a549d2008-03-25 23:52:34 +0000671 def _check_command_line(self, command_line, spacer=' ',
672 print_error=False):
673 results_dir_arg = spacer.join(('', '-r', self.results_dir, ''))
674 match = results_dir_arg in command_line
675 if print_error and not match:
676 print '%s not found in %s' % (repr(results_dir_arg),
677 repr(command_line))
678 return match
679
680
681 def _check_proc_fs(self, pid):
mblighbb421852008-03-11 22:36:16 +0000682 cmdline_path = os.path.join('/proc', str(pid), 'cmdline')
683 try:
684 cmdline_file = open(cmdline_path, 'r')
685 cmdline = cmdline_file.read().strip()
686 cmdline_file.close()
687 except IOError:
688 return False
689 # /proc/.../cmdline has \x00 separating args
mbligh90a549d2008-03-25 23:52:34 +0000690 return self._check_command_line(cmdline, spacer='\x00',
691 print_error=True)
mblighbb421852008-03-11 22:36:16 +0000692
693
694 def read_pidfile(self):
695 if not os.path.exists(self.pid_file):
696 return None, None
697 file_obj = open(self.pid_file, 'r')
698 lines = file_obj.readlines()
699 file_obj.close()
700 assert 1 <= len(lines) <= 2
701 try:
702 pid = int(lines[0])
703 exit_status = None
704 if len(lines) == 2:
705 exit_status = int(lines[1])
706 except ValueError, exc:
707 raise Exception('Corrupt pid file: ' + str(exc.args))
708
709 return pid, exit_status
710
711
mbligh90a549d2008-03-25 23:52:34 +0000712 def _find_autoserv_proc(self):
713 autoserv_procs = Dispatcher.find_autoservs()
714 for pid, args in autoserv_procs.iteritems():
715 if self._check_command_line(args):
716 return pid, args
717 return None, None
718
719
mblighbb421852008-03-11 22:36:16 +0000720 def get_pidfile_info(self):
721 """\
722 Returns:
723 None, None if autoserv has not yet run
724 pid, None if autoserv is running
725 pid, exit_status if autoserv has completed
726 """
727 if self.lost_process:
728 return self.pid, self.exit_status
729
730 pid, exit_status = self.read_pidfile()
731
mbligh90a549d2008-03-25 23:52:34 +0000732 if pid is None:
733 return self._handle_no_pid()
734
735 if exit_status is None:
736 # double check whether or not autoserv is running
737 proc_running = self._check_proc_fs(pid)
738 if proc_running:
739 return pid, exit_status
740
741 # pid but no process - maybe process *just* exited
mblighbb421852008-03-11 22:36:16 +0000742 pid, exit_status = self.read_pidfile()
mbligh90a549d2008-03-25 23:52:34 +0000743 if exit_status is None:
mblighbb421852008-03-11 22:36:16 +0000744 # autoserv exited without writing an exit code
745 # to the pidfile
746 error = ('autoserv died without writing exit '
747 'code')
748 message = error + '\nPid: %s\nPidfile: %s' % (
749 pid, self.pid_file)
750 print message
showard7cf9a9b2008-05-15 21:15:52 +0000751 email_manager.enqueue_notify_email(error,
752 message)
mbligh90a549d2008-03-25 23:52:34 +0000753 self.on_lost_process(pid)
mblighbb421852008-03-11 22:36:16 +0000754 return self.pid, self.exit_status
755
756 return pid, exit_status
757
758
mbligh90a549d2008-03-25 23:52:34 +0000759 def _handle_no_pid(self):
760 """\
761 Called when no pidfile is found or no pid is in the pidfile.
762 """
763 # is autoserv running?
764 pid, args = self._find_autoserv_proc()
765 if pid is None:
766 # no autoserv process running
767 message = 'No pid found at ' + self.pid_file
768 else:
769 message = ("Process %d (%s) hasn't written pidfile %s" %
770 (pid, args, self.pid_file))
771
772 print message
773 if time.time() - self.start_time > PIDFILE_TIMEOUT:
showard7cf9a9b2008-05-15 21:15:52 +0000774 email_manager.enqueue_notify_email(
775 'Process has failed to write pidfile', message)
mbligh90a549d2008-03-25 23:52:34 +0000776 if pid is not None:
777 kill_autoserv(pid)
778 else:
779 pid = 0
780 self.on_lost_process(pid)
781 return self.pid, self.exit_status
782
783 return None, None
784
785
786 def on_lost_process(self, pid):
787 """\
788 Called when autoserv has exited without writing an exit status,
789 or we've timed out waiting for autoserv to write a pid to the
790 pidfile. In either case, we just return failure and the caller
791 should signal some kind of warning.
792
793 pid is unimportant here, as it shouldn't be used by anyone.
794 """
795 self.lost_process = True
796 self.pid = pid
797 self.exit_status = 1
798
799
mblighbb421852008-03-11 22:36:16 +0000800 def exit_code(self):
801 pid, exit_code = self.get_pidfile_info()
mblighbb421852008-03-11 22:36:16 +0000802 return exit_code
803
804
mbligh36768f02008-02-22 18:28:33 +0000805class Agent(object):
mblighd5c95802008-03-05 00:33:46 +0000806 def __init__(self, tasks, queue_entry_ids=[]):
mbligh36768f02008-02-22 18:28:33 +0000807 self.active_task = None
808 self.queue = Queue.Queue(0)
809 self.dispatcher = None
mblighd5c95802008-03-05 00:33:46 +0000810 self.queue_entry_ids = queue_entry_ids
mbligh36768f02008-02-22 18:28:33 +0000811
812 for task in tasks:
813 self.add_task(task)
814
815
816 def add_task(self, task):
817 self.queue.put_nowait(task)
818 task.agent = self
819
820
821 def tick(self):
822 print "agent tick"
823 if self.active_task and not self.active_task.is_done():
824 self.active_task.poll()
825 else:
826 self._next_task();
827
828
829 def _next_task(self):
830 print "agent picking task"
831 if self.active_task:
832 assert self.active_task.is_done()
833
mblighe2586682008-02-29 22:45:46 +0000834 if not self.active_task.success:
835 self.on_task_failure()
836
mbligh36768f02008-02-22 18:28:33 +0000837 self.active_task = None
838 if not self.is_done():
839 self.active_task = self.queue.get_nowait()
840 if self.active_task:
841 self.active_task.start()
842
843
mblighe2586682008-02-29 22:45:46 +0000844 def on_task_failure(self):
mblighe2586682008-02-29 22:45:46 +0000845 self.queue = Queue.Queue(0)
846 for task in self.active_task.failure_tasks:
847 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +0000848
mblighe2586682008-02-29 22:45:46 +0000849
showardec113162008-05-08 00:52:49 +0000850 def is_started(self):
851 return self.active_task is not None
852
853
mbligh36768f02008-02-22 18:28:33 +0000854 def is_done(self):
855 return self.active_task == None and self.queue.empty()
856
857
858 def start(self):
859 assert self.dispatcher
860
861 self._next_task()
862
mblighd5c95802008-03-05 00:33:46 +0000863
mbligh36768f02008-02-22 18:28:33 +0000864class AgentTask(object):
mbligh16c722d2008-03-05 00:58:44 +0000865 def __init__(self, cmd, failure_tasks = []):
mbligh36768f02008-02-22 18:28:33 +0000866 self.done = False
867 self.failure_tasks = failure_tasks
868 self.started = False
869 self.cmd = cmd
mblighd5c95802008-03-05 00:33:46 +0000870 self.task = None
mbligh36768f02008-02-22 18:28:33 +0000871 self.agent = None
mblighd5c95802008-03-05 00:33:46 +0000872 self.monitor = None
mblighd64e5702008-04-04 21:39:28 +0000873 self.success = None
mbligh36768f02008-02-22 18:28:33 +0000874
875
876 def poll(self):
877 print "poll"
mblighd5c95802008-03-05 00:33:46 +0000878 if self.monitor:
mbligh36768f02008-02-22 18:28:33 +0000879 self.tick(self.monitor.exit_code())
880 else:
881 self.finished(False)
882
883
884 def tick(self, exit_code):
885 if exit_code==None:
886 return
887# print "exit_code was %d" % exit_code
888 if exit_code == 0:
889 success = True
890 else:
891 success = False
892
893 self.finished(success)
894
895
896 def is_done(self):
897 return self.done
898
899
900 def finished(self, success):
901 self.done = True
902 self.success = success
903 self.epilog()
904
905
906 def prolog(self):
907 pass
908
mblighd64e5702008-04-04 21:39:28 +0000909
910 def create_temp_resultsdir(self, suffix=''):
911 self.temp_results_dir = tempfile.mkdtemp(suffix=suffix)
912
913
914 def cleanup(self):
915 if (hasattr(self, 'temp_results_dir') and
916 os.path.exists(self.temp_results_dir)):
917 shutil.rmtree(self.temp_results_dir)
918
mbligh36768f02008-02-22 18:28:33 +0000919
920 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +0000921 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +0000922
923
924 def start(self):
925 assert self.agent
926
927 if not self.started:
928 self.prolog()
929 self.run()
930
931 self.started = True
932
mblighd64e5702008-04-04 21:39:28 +0000933
mbligh36768f02008-02-22 18:28:33 +0000934 def abort(self):
mblighd5c95802008-03-05 00:33:46 +0000935 if self.monitor:
936 self.monitor.kill()
937 self.done = True
mblighd64e5702008-04-04 21:39:28 +0000938 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +0000939
940
941 def run(self):
942 if self.cmd:
943 print "agent starting monitor"
mbligh36768f02008-02-22 18:28:33 +0000944 log_file = None
945 if hasattr(self, 'host'):
mblighbb421852008-03-11 22:36:16 +0000946 log_file = os.path.join(RESULTS_DIR, 'hosts',
947 self.host.hostname)
948 self.monitor = RunMonitor(
949 self.cmd, nice_level = AUTOSERV_NICE_LEVEL,
950 log_file = log_file)
mbligh36768f02008-02-22 18:28:33 +0000951
952
953class RepairTask(AgentTask):
mbligh16c722d2008-03-05 00:58:44 +0000954 def __init__(self, host, fail_queue_entry=None):
955 """\
956 fail_queue_entry: queue entry to mark failed if this repair
957 fails.
958 """
mblighd64e5702008-04-04 21:39:28 +0000959 self.create_temp_resultsdir('.repair')
960 cmd = [_autoserv_path , '-R', '-m', host.hostname,
961 '-r', self.temp_results_dir]
mbligh36768f02008-02-22 18:28:33 +0000962 self.host = host
mbligh16c722d2008-03-05 00:58:44 +0000963 self.fail_queue_entry = fail_queue_entry
mblighd64e5702008-04-04 21:39:28 +0000964 super(RepairTask, self).__init__(cmd)
mblighe2586682008-02-29 22:45:46 +0000965
mbligh36768f02008-02-22 18:28:33 +0000966
967 def prolog(self):
968 print "repair_task starting"
969 self.host.set_status('Repairing')
970
971
972 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +0000973 super(RepairTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +0000974 if self.success:
mbligh16c722d2008-03-05 00:58:44 +0000975 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +0000976 else:
mbligh16c722d2008-03-05 00:58:44 +0000977 self.host.set_status('Repair Failed')
978 if self.fail_queue_entry:
979 self.fail_queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +0000980
981
982class VerifyTask(AgentTask):
983 def __init__(self, queue_entry=None, host=None):
984 assert bool(queue_entry) != bool(host)
985
986 self.host = host or queue_entry.host
987 self.queue_entry = queue_entry
988
mblighd64e5702008-04-04 21:39:28 +0000989 self.create_temp_resultsdir('.verify')
mbligh48c10a52008-02-29 22:46:38 +0000990 cmd = [_autoserv_path,'-v','-m',self.host.hostname,
mbligh36768f02008-02-22 18:28:33 +0000991 '-r', self.temp_results_dir]
992
mbligh16c722d2008-03-05 00:58:44 +0000993 fail_queue_entry = None
994 if queue_entry and not queue_entry.meta_host:
995 fail_queue_entry = queue_entry
996 failure_tasks = [RepairTask(self.host, fail_queue_entry)]
mblighe2586682008-02-29 22:45:46 +0000997
mblighd64e5702008-04-04 21:39:28 +0000998 super(VerifyTask, self).__init__(cmd,
999 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001000
1001
mbligh36768f02008-02-22 18:28:33 +00001002 def prolog(self):
1003 print "starting verify on %s" % (self.host.hostname)
1004 if self.queue_entry:
1005 self.queue_entry.set_status('Verifying')
mblighdffd6372008-02-29 22:47:33 +00001006 self.queue_entry.clear_results_dir(
1007 self.queue_entry.verify_results_dir())
mbligh36768f02008-02-22 18:28:33 +00001008 self.host.set_status('Verifying')
1009
1010
mblighd64e5702008-04-04 21:39:28 +00001011 def cleanup(self):
1012 if not os.path.exists(self.temp_results_dir):
1013 return
mbligh36768f02008-02-22 18:28:33 +00001014 if self.queue_entry and (self.success or
mblighd64e5702008-04-04 21:39:28 +00001015 not self.queue_entry.meta_host):
mbligh36768f02008-02-22 18:28:33 +00001016 self.move_results()
mblighd64e5702008-04-04 21:39:28 +00001017 super(VerifyTask, self).cleanup()
1018
1019
1020 def epilog(self):
1021 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001022
1023 if self.success:
mbligh16c722d2008-03-05 00:58:44 +00001024 self.host.set_status('Ready')
1025 elif self.queue_entry:
mblighdffd6372008-02-29 22:47:33 +00001026 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +00001027
1028
1029 def move_results(self):
1030 assert self.queue_entry is not None
mblighe2586682008-02-29 22:45:46 +00001031 target_dir = self.queue_entry.verify_results_dir()
mbligh36768f02008-02-22 18:28:33 +00001032 if not os.path.exists(target_dir):
1033 os.makedirs(target_dir)
1034 files = os.listdir(self.temp_results_dir)
1035 for filename in files:
mblighbb421852008-03-11 22:36:16 +00001036 if filename == AUTOSERV_PID_FILE:
1037 continue
mblighe2586682008-02-29 22:45:46 +00001038 self.force_move(os.path.join(self.temp_results_dir,
1039 filename),
1040 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +00001041
1042
mblighe2586682008-02-29 22:45:46 +00001043 @staticmethod
1044 def force_move(source, dest):
1045 """\
1046 Replacement for shutil.move() that will delete the destination
1047 if it exists, even if it's a directory.
1048 """
1049 if os.path.exists(dest):
1050 print ('Warning: removing existing destination file ' +
1051 dest)
1052 remove_file_or_dir(dest)
1053 shutil.move(source, dest)
1054
1055
mblighdffd6372008-02-29 22:47:33 +00001056class VerifySynchronousTask(VerifyTask):
1057 def __init__(self, queue_entry):
mblighd64e5702008-04-04 21:39:28 +00001058 super(VerifySynchronousTask, self).__init__(
1059 queue_entry = queue_entry)
mblighdffd6372008-02-29 22:47:33 +00001060
1061
mbligh16c722d2008-03-05 00:58:44 +00001062 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001063 super(VerifySynchronousTask, self).epilog()
mbligh16c722d2008-03-05 00:58:44 +00001064 if self.success:
1065 if self.queue_entry.job.num_complete() > 0:
1066 # some other entry failed verify, and we've
1067 # already been marked as stopped
1068 return
mblighdffd6372008-02-29 22:47:33 +00001069
mbligh16c722d2008-03-05 00:58:44 +00001070 self.queue_entry.set_status('Pending')
1071 job = self.queue_entry.job
1072 if job.is_ready():
1073 agent = job.run(self.queue_entry)
1074 self.agent.dispatcher.add_agent(agent)
mblighe2586682008-02-29 22:45:46 +00001075
mbligh36768f02008-02-22 18:28:33 +00001076class QueueTask(AgentTask):
1077 def __init__(self, job, queue_entries, cmd):
mblighd64e5702008-04-04 21:39:28 +00001078 super(QueueTask, self).__init__(cmd)
mbligh36768f02008-02-22 18:28:33 +00001079 self.job = job
1080 self.queue_entries = queue_entries
1081
1082
mblighbb421852008-03-11 22:36:16 +00001083 @staticmethod
1084 def _write_keyval(results_dir, field, value):
1085 key_path = os.path.join(results_dir, 'keyval')
mbligh36768f02008-02-22 18:28:33 +00001086 keyval_file = open(key_path, 'a')
1087 print >> keyval_file, '%s=%d' % (field, value)
1088 keyval_file.close()
1089
1090
mblighbb421852008-03-11 22:36:16 +00001091 def results_dir(self):
1092 return self.queue_entries[0].results_dir()
1093
1094
1095 def run(self):
1096 """\
1097 Override AgentTask.run() so we can use a PidfileRunMonitor.
1098 """
1099 self.monitor = PidfileRunMonitor(self.results_dir(),
1100 cmd=self.cmd,
1101 nice_level=AUTOSERV_NICE_LEVEL)
1102
1103
mbligh36768f02008-02-22 18:28:33 +00001104 def prolog(self):
mblighe2586682008-02-29 22:45:46 +00001105 # write some job timestamps into the job keyval file
1106 queued = time.mktime(self.job.created_on.timetuple())
1107 started = time.time()
mblighbb421852008-03-11 22:36:16 +00001108 self._write_keyval(self.results_dir(), "job_queued", queued)
1109 self._write_keyval(self.results_dir(), "job_started", started)
mbligh36768f02008-02-22 18:28:33 +00001110 for queue_entry in self.queue_entries:
1111 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
1112 queue_entry.set_status('Running')
1113 queue_entry.host.set_status('Running')
mblighe2586682008-02-29 22:45:46 +00001114 if (not self.job.is_synchronous() and
1115 self.job.num_machines() > 1):
1116 assert len(self.queue_entries) == 1
1117 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001118
1119
1120 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001121 super(QueueTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001122 if self.success:
1123 status = 'Completed'
1124 else:
1125 status = 'Failed'
1126
mblighe2586682008-02-29 22:45:46 +00001127 # write another timestamp into the job keyval file
1128 finished = time.time()
mblighbb421852008-03-11 22:36:16 +00001129 self._write_keyval(self.results_dir(), "job_finished", finished)
mbligh36768f02008-02-22 18:28:33 +00001130 for queue_entry in self.queue_entries:
1131 queue_entry.set_status(status)
1132 queue_entry.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001133
1134 if self.job.is_synchronous() or self.job.num_machines()==1:
1135 if self.job.is_finished():
1136 parse_results(self.job.results_dir())
1137 else:
1138 for queue_entry in self.queue_entries:
mblighbb421852008-03-11 22:36:16 +00001139 parse_results(queue_entry.results_dir(),
1140 flags='-l 2')
1141
mbligh36768f02008-02-22 18:28:33 +00001142 print "queue_task finished with %s/%s" % (status, self.success)
1143
1144
mblighbb421852008-03-11 22:36:16 +00001145class RecoveryQueueTask(QueueTask):
1146 def __init__(self, job, queue_entries, run_monitor):
mblighd64e5702008-04-04 21:39:28 +00001147 super(RecoveryQueueTask, self).__init__(job,
1148 queue_entries, cmd=None)
mblighbb421852008-03-11 22:36:16 +00001149 self.run_monitor = run_monitor
1150
1151
1152 def run(self):
1153 self.monitor = self.run_monitor
1154
1155
1156 def prolog(self):
1157 # recovering an existing process - don't do prolog
1158 pass
1159
1160
mbligh36768f02008-02-22 18:28:33 +00001161class RebootTask(AgentTask):
mblighd5c95802008-03-05 00:33:46 +00001162 def __init__(self, host):
1163 global _autoserv_path
1164
1165 # Current implementation of autoserv requires control file
1166 # to be passed on reboot action request. TODO: remove when no
1167 # longer appropriate.
mblighd64e5702008-04-04 21:39:28 +00001168 self.create_temp_resultsdir('.reboot')
mblighd5c95802008-03-05 00:33:46 +00001169 self.cmd = [_autoserv_path, '-b', '-m', host.hostname,
mblighd64e5702008-04-04 21:39:28 +00001170 '-r', self.temp_results_dir, '/dev/null']
mbligh36768f02008-02-22 18:28:33 +00001171 self.host = host
mblighd64e5702008-04-04 21:39:28 +00001172 super(RebootTask, self).__init__(self.cmd,
mbligh16c722d2008-03-05 00:58:44 +00001173 failure_tasks=[RepairTask(host)])
1174
mblighd5c95802008-03-05 00:33:46 +00001175
1176 def prolog(self):
1177 print "starting reboot task for host: %s" % self.host.hostname
1178 self.host.set_status("Rebooting")
1179
mblighd5c95802008-03-05 00:33:46 +00001180
1181class AbortTask(AgentTask):
1182 def __init__(self, queue_entry, agents_to_abort):
1183 self.queue_entry = queue_entry
1184 self.agents_to_abort = agents_to_abort
1185 for agent in agents_to_abort:
1186 agent.dispatcher.remove_agent(agent)
mblighd64e5702008-04-04 21:39:28 +00001187 super(AbortTask, self).__init__('')
mbligh36768f02008-02-22 18:28:33 +00001188
1189
mblighd5c95802008-03-05 00:33:46 +00001190 def prolog(self):
1191 print "starting abort on host %s, job %s" % (
1192 self.queue_entry.host_id, self.queue_entry.job_id)
1193 self.queue_entry.set_status('Aborting')
1194
mbligh36768f02008-02-22 18:28:33 +00001195
mblighd5c95802008-03-05 00:33:46 +00001196 def epilog(self):
mblighd64e5702008-04-04 21:39:28 +00001197 super(AbortTask, self).epilog()
mblighd5c95802008-03-05 00:33:46 +00001198 self.queue_entry.set_status('Aborted')
1199 self.success = True
mbligh36768f02008-02-22 18:28:33 +00001200
mblighd64e5702008-04-04 21:39:28 +00001201
mbligh36768f02008-02-22 18:28:33 +00001202 def run(self):
mblighd5c95802008-03-05 00:33:46 +00001203 for agent in self.agents_to_abort:
1204 if (agent.active_task):
1205 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001206
1207
1208class DBObject(object):
mblighe2586682008-02-29 22:45:46 +00001209 def __init__(self, fields, id=None, row=None, new_record=False):
1210 assert (bool(id) != bool(row)) and fields
mbligh36768f02008-02-22 18:28:33 +00001211
mblighe2586682008-02-29 22:45:46 +00001212 self.__table = self._get_table()
mbligh36768f02008-02-22 18:28:33 +00001213 self.__fields = fields
1214
1215 self.__new_record = new_record
1216
1217 if row is None:
1218 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
mbligh6f8bab42008-02-29 22:45:14 +00001219 rows = _db.execute(sql, (id,))
1220 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001221 raise "row not found (table=%s, id=%s)" % \
1222 (self.__table, id)
mbligh6f8bab42008-02-29 22:45:14 +00001223 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001224
mblighe2586682008-02-29 22:45:46 +00001225 assert len(row)==len(fields), (
1226 "table = %s, row = %s/%d, fields = %s/%d" % (
1227 self.__table, row, len(row), fields, len(fields)))
mbligh36768f02008-02-22 18:28:33 +00001228
1229 self.__valid_fields = {}
1230 for i,value in enumerate(row):
1231 self.__dict__[fields[i]] = value
1232 self.__valid_fields[fields[i]] = True
1233
1234 del self.__valid_fields['id']
1235
mblighe2586682008-02-29 22:45:46 +00001236
1237 @classmethod
1238 def _get_table(cls):
1239 raise NotImplementedError('Subclasses must override this')
1240
1241
mbligh36768f02008-02-22 18:28:33 +00001242 def count(self, where, table = None):
1243 if not table:
1244 table = self.__table
mbligh4314a712008-02-29 22:44:30 +00001245
mbligh6f8bab42008-02-29 22:45:14 +00001246 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001247 SELECT count(*) FROM %s
1248 WHERE %s
1249 """ % (table, where))
mbligh36768f02008-02-22 18:28:33 +00001250
mbligh6f8bab42008-02-29 22:45:14 +00001251 assert len(rows) == 1
1252
1253 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001254
1255
1256 def num_cols(self):
1257 return len(self.__fields)
1258
1259
1260 def update_field(self, field, value):
1261 assert self.__valid_fields[field]
1262
1263 if self.__dict__[field] == value:
1264 return
1265
1266 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % \
1267 (self.__table, field)
mbligh6f8bab42008-02-29 22:45:14 +00001268 _db.execute(query, (value, self.id))
mbligh36768f02008-02-22 18:28:33 +00001269
1270 self.__dict__[field] = value
1271
1272
1273 def save(self):
1274 if self.__new_record:
1275 keys = self.__fields[1:] # avoid id
1276 columns = ','.join([str(key) for key in keys])
1277 values = ['"%s"' % self.__dict__[key] for key in keys]
1278 values = ','.join(values)
1279 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1280 (self.__table, columns, values)
mbligh6f8bab42008-02-29 22:45:14 +00001281 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001282
1283
mblighe2586682008-02-29 22:45:46 +00001284 def delete(self):
1285 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1286 _db.execute(query, (self.id,))
1287
1288
1289 @classmethod
mbligh62ba2ed2008-04-30 17:09:25 +00001290 def fetch(cls, where, params=()):
mblighe2586682008-02-29 22:45:46 +00001291 rows = _db.execute(
mbligh62ba2ed2008-04-30 17:09:25 +00001292 'SELECT * FROM %s WHERE %s' % (cls._get_table(), where),
1293 params)
mblighe2586682008-02-29 22:45:46 +00001294 for row in rows:
1295 yield cls(row=row)
1296
mbligh36768f02008-02-22 18:28:33 +00001297
1298class IneligibleHostQueue(DBObject):
1299 def __init__(self, id=None, row=None, new_record=None):
1300 fields = ['id', 'job_id', 'host_id']
mblighd64e5702008-04-04 21:39:28 +00001301 super(IneligibleHostQueue, self).__init__(fields, id=id,
1302 row=row, new_record=new_record)
mblighe2586682008-02-29 22:45:46 +00001303
1304
1305 @classmethod
1306 def _get_table(cls):
1307 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001308
1309
1310class Host(DBObject):
1311 def __init__(self, id=None, row=None):
mbligh5244cbb2008-04-24 20:39:52 +00001312 fields = ['id', 'hostname', 'locked', 'synch_id','status',
1313 'invalid']
mblighd64e5702008-04-04 21:39:28 +00001314 super(Host, self).__init__(fields, id=id, row=row)
mblighe2586682008-02-29 22:45:46 +00001315
1316
1317 @classmethod
1318 def _get_table(cls):
1319 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001320
1321
1322 def current_task(self):
mbligh6f8bab42008-02-29 22:45:14 +00001323 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001324 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1325 """, (self.id,))
1326
mbligh6f8bab42008-02-29 22:45:14 +00001327 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001328 return None
1329 else:
mbligh6f8bab42008-02-29 22:45:14 +00001330 assert len(rows) == 1
1331 results = rows[0];
mbligh36768f02008-02-22 18:28:33 +00001332# print "current = %s" % results
1333 return HostQueueEntry(row=results)
1334
1335
1336 def next_queue_entries(self):
1337 if self.locked:
1338 print "%s locked, not queuing" % self.hostname
1339 return None
1340# print "%s/%s looking for work" % (self.hostname, self.platform_id)
mbligh6f8bab42008-02-29 22:45:14 +00001341 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001342 SELECT * FROM host_queue_entries
1343 WHERE ((host_id=%s) OR (meta_host IS NOT null AND
1344 (meta_host IN (
1345 SELECT label_id FROM hosts_labels WHERE host_id=%s
1346 )
1347 )
1348 AND job_id NOT IN (
1349 SELECT job_id FROM ineligible_host_queues
1350 WHERE host_id=%s
1351 )))
1352 AND NOT complete AND NOT active
1353 ORDER BY priority DESC, meta_host, id
1354 LIMIT 1
1355 """, (self.id,self.id, self.id))
1356
mbligh6f8bab42008-02-29 22:45:14 +00001357 if len(rows) == 0:
mbligh36768f02008-02-22 18:28:33 +00001358 return None
1359 else:
mbligh6f8bab42008-02-29 22:45:14 +00001360 return [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001361
1362 def yield_work(self):
1363 print "%s yielding work" % self.hostname
1364 if self.current_task():
1365 self.current_task().requeue()
1366
1367 def set_status(self,status):
mblighbb421852008-03-11 22:36:16 +00001368 print '%s -> %s' % (self.hostname, status)
mbligh36768f02008-02-22 18:28:33 +00001369 self.update_field('status',status)
1370
1371
1372class HostQueueEntry(DBObject):
1373 def __init__(self, id=None, row=None):
1374 assert id or row
1375 fields = ['id', 'job_id', 'host_id', 'priority', 'status',
1376 'meta_host', 'active', 'complete']
mblighd64e5702008-04-04 21:39:28 +00001377 super(HostQueueEntry, self).__init__(fields, id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001378 self.job = Job(self.job_id)
1379
1380 if self.host_id:
1381 self.host = Host(self.host_id)
1382 else:
1383 self.host = None
1384
1385 self.queue_log_path = os.path.join(self.job.results_dir(),
1386 'queue.log.' + str(self.id))
1387
1388
mblighe2586682008-02-29 22:45:46 +00001389 @classmethod
1390 def _get_table(cls):
1391 return 'host_queue_entries'
1392
1393
mbligh36768f02008-02-22 18:28:33 +00001394 def set_host(self, host):
1395 if host:
1396 self.queue_log_record('Assigning host ' + host.hostname)
1397 self.update_field('host_id', host.id)
1398 self.update_field('active', True)
mblighe2586682008-02-29 22:45:46 +00001399 self.block_host(host.id)
mbligh36768f02008-02-22 18:28:33 +00001400 else:
1401 self.queue_log_record('Releasing host')
mblighe2586682008-02-29 22:45:46 +00001402 self.unblock_host(self.host.id)
mbligh36768f02008-02-22 18:28:33 +00001403 self.update_field('host_id', None)
1404
1405 self.host = host
1406
1407
1408 def get_host(self):
mblighe2586682008-02-29 22:45:46 +00001409 return self.host
mbligh36768f02008-02-22 18:28:33 +00001410
1411
1412 def queue_log_record(self, log_line):
mblighe2586682008-02-29 22:45:46 +00001413 now = str(datetime.datetime.now())
mbligh36768f02008-02-22 18:28:33 +00001414 queue_log = open(self.queue_log_path, 'a', 0)
mblighe2586682008-02-29 22:45:46 +00001415 queue_log.write(now + ' ' + log_line + '\n')
mbligh36768f02008-02-22 18:28:33 +00001416 queue_log.close()
1417
1418
mblighe2586682008-02-29 22:45:46 +00001419 def block_host(self, host_id):
1420 print "creating block %s/%s" % (self.job.id, host_id)
1421 row = [0, self.job.id, host_id]
1422 block = IneligibleHostQueue(row=row, new_record=True)
1423 block.save()
1424
1425
1426 def unblock_host(self, host_id):
1427 print "removing block %s/%s" % (self.job.id, host_id)
showarda0939722008-05-06 21:18:13 +00001428 blocks = IneligibleHostQueue.fetch(
1429 'job_id=%d and host_id=%d' % (self.job.id, host_id))
1430 for block in blocks:
1431 block.delete()
mblighe2586682008-02-29 22:45:46 +00001432
1433
mbligh36768f02008-02-22 18:28:33 +00001434 def results_dir(self):
mblighe2586682008-02-29 22:45:46 +00001435 if self.job.is_synchronous() or self.job.num_machines() == 1:
1436 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001437 else:
1438 assert self.host
mblighe2586682008-02-29 22:45:46 +00001439 return os.path.join(self.job.job_dir,
1440 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +00001441
mblighe2586682008-02-29 22:45:46 +00001442
1443 def verify_results_dir(self):
1444 if self.job.is_synchronous() or self.job.num_machines() > 1:
1445 assert self.host
1446 return os.path.join(self.job.job_dir,
1447 self.host.hostname)
1448 else:
1449 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001450
1451
1452 def set_status(self, status):
1453 self.update_field('status', status)
1454 if self.host:
1455 hostname = self.host.hostname
1456 else:
1457 hostname = 'no host'
1458 print "%s/%d status -> %s" % (hostname, self.id, self.status)
1459 if status in ['Queued']:
1460 self.update_field('complete', False)
1461 self.update_field('active', False)
1462
mblighd5c95802008-03-05 00:33:46 +00001463 if status in ['Pending', 'Running', 'Verifying', 'Starting',
1464 'Abort', 'Aborting']:
mbligh36768f02008-02-22 18:28:33 +00001465 self.update_field('complete', False)
1466 self.update_field('active', True)
1467
mblighd5c95802008-03-05 00:33:46 +00001468 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
mbligh36768f02008-02-22 18:28:33 +00001469 self.update_field('complete', True)
1470 self.update_field('active', False)
1471
1472
1473 def run(self,assigned_host=None):
1474 if self.meta_host:
1475 assert assigned_host
mblighe2586682008-02-29 22:45:46 +00001476 # ensure results dir exists for the queue log
mbligh36768f02008-02-22 18:28:33 +00001477 self.job.create_results_dir()
1478 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001479
mbligh36768f02008-02-22 18:28:33 +00001480 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1481 self.meta_host, self.host.hostname, self.status)
1482
1483 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001484
mbligh36768f02008-02-22 18:28:33 +00001485 def requeue(self):
1486 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001487
mbligh36768f02008-02-22 18:28:33 +00001488 if self.meta_host:
1489 self.set_host(None)
1490
1491
mblighe2586682008-02-29 22:45:46 +00001492 def handle_host_failure(self):
1493 """\
1494 Called when this queue entry's host has failed verification and
1495 repair.
1496 """
mblighdffd6372008-02-29 22:47:33 +00001497 assert not self.meta_host
1498 self.set_status('Failed')
1499 if self.job.is_synchronous():
1500 self.job.stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00001501
1502
mbligh90a549d2008-03-25 23:52:34 +00001503 def clear_results_dir(self, results_dir=None, dont_delete_files=False):
mblighe2586682008-02-29 22:45:46 +00001504 results_dir = results_dir or self.results_dir()
1505 if not os.path.exists(results_dir):
1506 return
mbligh90a549d2008-03-25 23:52:34 +00001507 if dont_delete_files:
1508 temp_dir = tempfile.mkdtemp(suffix='.clear_results')
1509 print 'Moving results from %s to %s' % (results_dir,
1510 temp_dir)
mblighe2586682008-02-29 22:45:46 +00001511 for filename in os.listdir(results_dir):
mblighe2586682008-02-29 22:45:46 +00001512 path = os.path.join(results_dir, filename)
mbligh90a549d2008-03-25 23:52:34 +00001513 if dont_delete_files:
1514 shutil.move(path,
1515 os.path.join(temp_dir, filename))
1516 else:
1517 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00001518
1519
1520class Job(DBObject):
1521 def __init__(self, id=None, row=None):
1522 assert id or row
mblighd64e5702008-04-04 21:39:28 +00001523 super(Job, self).__init__(
mblighe2586682008-02-29 22:45:46 +00001524 ['id','owner','name','priority',
1525 'control_file','control_type','created_on',
1526 'synch_type', 'synch_count','synchronizing'],
1527 id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001528
mblighe2586682008-02-29 22:45:46 +00001529 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
1530 self.owner))
1531
1532
1533 @classmethod
1534 def _get_table(cls):
1535 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001536
1537
1538 def is_server_job(self):
1539 return self.control_type != 2
1540
1541
1542 def get_host_queue_entries(self):
mbligh6f8bab42008-02-29 22:45:14 +00001543 rows = _db.execute("""
mbligh36768f02008-02-22 18:28:33 +00001544 SELECT * FROM host_queue_entries
1545 WHERE job_id= %s
1546 """, (self.id,))
mbligh6f8bab42008-02-29 22:45:14 +00001547 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001548
1549 assert len(entries)>0
1550
1551 return entries
1552
1553
1554 def set_status(self, status, update_queues=False):
1555 self.update_field('status',status)
1556
1557 if update_queues:
1558 for queue_entry in self.get_host_queue_entries():
1559 queue_entry.set_status(status)
1560
1561
1562 def is_synchronous(self):
1563 return self.synch_type == 2
1564
1565
1566 def is_ready(self):
1567 if not self.is_synchronous():
1568 return True
1569 sql = "job_id=%s AND status='Pending'" % self.id
1570 count = self.count(sql, table='host_queue_entries')
1571 return (count == self.synch_count)
1572
1573
1574 def ready_to_synchronize(self):
1575 # heuristic
1576 queue_entries = self.get_host_queue_entries()
1577 count = 0
1578 for queue_entry in queue_entries:
1579 if queue_entry.status == 'Pending':
1580 count += 1
1581
1582 return (count/self.synch_count >= 0.5)
1583
1584
1585 def start_synchronizing(self):
1586 self.update_field('synchronizing', True)
1587
1588
1589 def results_dir(self):
1590 return self.job_dir
1591
1592 def num_machines(self, clause = None):
1593 sql = "job_id=%s" % self.id
1594 if clause:
1595 sql += " AND (%s)" % clause
1596 return self.count(sql, table='host_queue_entries')
1597
1598
1599 def num_queued(self):
1600 return self.num_machines('not complete')
1601
1602
1603 def num_active(self):
1604 return self.num_machines('active')
1605
1606
1607 def num_complete(self):
1608 return self.num_machines('complete')
1609
1610
1611 def is_finished(self):
1612 left = self.num_queued()
1613 print "%s: %s machines left" % (self.name, left)
1614 return left==0
1615
1616 def stop_synchronizing(self):
1617 self.update_field('synchronizing', False)
1618 self.set_status('Queued', update_queues = False)
1619
1620
mblighe2586682008-02-29 22:45:46 +00001621 def stop_all_entries(self):
1622 for child_entry in self.get_host_queue_entries():
1623 if not child_entry.complete:
1624 child_entry.set_status('Stopped')
1625
1626
1627 def write_to_machines_file(self, queue_entry):
1628 hostname = queue_entry.get_host().hostname
1629 print "writing %s to job %s machines file" % (hostname, self.id)
1630 file_path = os.path.join(self.job_dir, '.machines')
1631 mf = open(file_path, 'a')
1632 mf.write("%s\n" % queue_entry.get_host().hostname)
1633 mf.close()
mbligh36768f02008-02-22 18:28:33 +00001634
1635
1636 def create_results_dir(self, queue_entry=None):
1637 print "create: active: %s complete %s" % (self.num_active(),
1638 self.num_complete())
1639
1640 if not os.path.exists(self.job_dir):
1641 os.makedirs(self.job_dir)
1642
1643 if queue_entry:
1644 return queue_entry.results_dir()
1645 return self.job_dir
1646
1647
1648 def run(self, queue_entry):
1649 results_dir = self.create_results_dir(queue_entry)
1650
1651 if self.is_synchronous():
1652 if not self.is_ready():
mblighd5c95802008-03-05 00:33:46 +00001653 return Agent([VerifySynchronousTask(
1654 queue_entry = queue_entry)],
1655 [queue_entry.id])
mbligh36768f02008-02-22 18:28:33 +00001656
1657 queue_entry.set_status('Starting')
1658
1659 ctrl = open(os.tmpnam(), 'w')
1660 if self.control_file:
1661 ctrl.write(self.control_file)
1662 else:
1663 ctrl.write("")
1664 ctrl.flush()
1665
1666 if self.is_synchronous():
mbligh36768f02008-02-22 18:28:33 +00001667 queue_entries = self.get_host_queue_entries()
1668 else:
1669 assert queue_entry
mbligh36768f02008-02-22 18:28:33 +00001670 queue_entries = [queue_entry]
mblighe2586682008-02-29 22:45:46 +00001671 hostnames = ','.join([entry.get_host().hostname
1672 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00001673
mbligh6437ff52008-04-17 15:24:38 +00001674 # determine the job tag
1675 if self.is_synchronous() or self.num_machines() == 1:
1676 job_name = "%s-%s" % (self.id, self.owner)
1677 else:
1678 job_name = "%s-%s/%s" % (self.id, self.owner,
1679 hostnames)
1680
1681 params = [_autoserv_path, '-P', job_name, '-p', '-n',
mblighbb421852008-03-11 22:36:16 +00001682 '-r', os.path.abspath(results_dir),
1683 '-b', '-u', self.owner, '-l', self.name,
1684 '-m', hostnames, ctrl.name]
mbligh36768f02008-02-22 18:28:33 +00001685
1686 if not self.is_server_job():
1687 params.append('-c')
1688
1689 tasks = []
1690 if not self.is_synchronous():
1691 tasks.append(VerifyTask(queue_entry))
mblighe2586682008-02-29 22:45:46 +00001692
1693 tasks.append(QueueTask(job = self,
1694 queue_entries = queue_entries,
1695 cmd = params))
mbligh36768f02008-02-22 18:28:33 +00001696
mblighd5c95802008-03-05 00:33:46 +00001697 ids = []
1698 for entry in queue_entries:
1699 ids.append(entry.id)
1700
1701 agent = Agent(tasks, ids)
mbligh36768f02008-02-22 18:28:33 +00001702
1703 return agent
1704
1705
1706if __name__ == '__main__':
1707 main()