blob: cb237ad1c1e1b29882d173191bf653191096e124 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard542e8402008-09-19 20:16:18 +00008import datetime, errno, MySQLdb, optparse, os, pwd, Queue, re, shutil, signal
9import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
mbligh70feeee2008-06-11 16:20:49 +000010import common
showard542e8402008-09-19 20:16:18 +000011from autotest_lib.client.common_lib import global_config
12from autotest_lib.client.common_lib import host_protections, utils
mbligh70feeee2008-06-11 16:20:49 +000013
mblighb090f142008-02-27 21:33:46 +000014
mbligh36768f02008-02-22 18:28:33 +000015RESULTS_DIR = '.'
16AUTOSERV_NICE_LEVEL = 10
17
18AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
19
20if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000021 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000022AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
23AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
24
25if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000026 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000027
mblighbb421852008-03-11 22:36:16 +000028AUTOSERV_PID_FILE = '.autoserv_execute'
mbligh90a549d2008-03-25 23:52:34 +000029# how long to wait for autoserv to write a pidfile
30PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000031
mbligh6f8bab42008-02-29 22:45:14 +000032_db = None
mbligh36768f02008-02-22 18:28:33 +000033_shutdown = False
34_notify_email = None
mbligh4314a712008-02-29 22:44:30 +000035_autoserv_path = 'autoserv'
36_testing_mode = False
showardec113162008-05-08 00:52:49 +000037_global_config_section = 'SCHEDULER'
showard542e8402008-09-19 20:16:18 +000038_base_url = None
39# see os.getlogin() online docs
40_email_from = pwd.getpwuid(os.getuid())[0]
mbligh36768f02008-02-22 18:28:33 +000041
42
43def main():
jadmanski0afbb632008-06-06 21:10:57 +000044 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000045
jadmanski0afbb632008-06-06 21:10:57 +000046 parser = optparse.OptionParser(usage)
47 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
48 action='store_true')
49 parser.add_option('--logfile', help='Set a log file that all stdout ' +
50 'should be redirected to. Stderr will go to this ' +
51 'file + ".err"')
52 parser.add_option('--test', help='Indicate that scheduler is under ' +
53 'test and should use dummy autoserv and no parsing',
54 action='store_true')
55 (options, args) = parser.parse_args()
56 if len(args) != 1:
57 parser.print_usage()
58 return
mbligh36768f02008-02-22 18:28:33 +000059
jadmanski0afbb632008-06-06 21:10:57 +000060 global RESULTS_DIR
61 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000062
jadmanski0afbb632008-06-06 21:10:57 +000063 # read in notify_email from global_config
64 c = global_config.global_config
65 global _notify_email
66 val = c.get_config_value(_global_config_section, "notify_email")
67 if val != "":
68 _notify_email = val
mbligh36768f02008-02-22 18:28:33 +000069
showard3bb499f2008-07-03 19:42:20 +000070 tick_pause = c.get_config_value(
71 _global_config_section, 'tick_pause_sec', type=int)
72
jadmanski0afbb632008-06-06 21:10:57 +000073 if options.test:
74 global _autoserv_path
75 _autoserv_path = 'autoserv_dummy'
76 global _testing_mode
77 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +000078
showard542e8402008-09-19 20:16:18 +000079 # read in base url
80 global _base_url
81 val = c.get_config_value("AUTOTEST_WEB", "base_url")
82 if val:
83 _base_url = val
84 else:
85 _base_url = "http://your_autotest_server/afe/"
86
jadmanski0afbb632008-06-06 21:10:57 +000087 init(options.logfile)
88 dispatcher = Dispatcher()
89 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
90
91 try:
92 while not _shutdown:
93 dispatcher.tick()
showard3bb499f2008-07-03 19:42:20 +000094 time.sleep(tick_pause)
jadmanski0afbb632008-06-06 21:10:57 +000095 except:
96 log_stacktrace("Uncaught exception; terminating monitor_db")
97
98 email_manager.send_queued_emails()
99 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000100
101
102def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000103 global _shutdown
104 _shutdown = True
105 print "Shutdown request received."
mbligh36768f02008-02-22 18:28:33 +0000106
107
108def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000109 if logfile:
110 enable_logging(logfile)
111 print "%s> dispatcher starting" % time.strftime("%X %x")
112 print "My PID is %d" % os.getpid()
mbligh36768f02008-02-22 18:28:33 +0000113
jadmanski0afbb632008-06-06 21:10:57 +0000114 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
115 global _db
116 _db = DatabaseConn()
117 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000118
jadmanski0afbb632008-06-06 21:10:57 +0000119 print "Setting signal handler"
120 signal.signal(signal.SIGINT, handle_sigint)
121
122 print "Connected! Running..."
mbligh36768f02008-02-22 18:28:33 +0000123
124
125def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000126 out_file = logfile
127 err_file = "%s.err" % logfile
128 print "Enabling logging to %s (%s)" % (out_file, err_file)
129 out_fd = open(out_file, "a", buffering=0)
130 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000131
jadmanski0afbb632008-06-06 21:10:57 +0000132 os.dup2(out_fd.fileno(), sys.stdout.fileno())
133 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000134
jadmanski0afbb632008-06-06 21:10:57 +0000135 sys.stdout = out_fd
136 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000137
138
mblighd5c95802008-03-05 00:33:46 +0000139def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000140 rows = _db.execute("""
141 SELECT * FROM host_queue_entries WHERE status='Abort';
142 """)
143 qe = [HostQueueEntry(row=i) for i in rows]
144 return qe
mbligh36768f02008-02-22 18:28:33 +0000145
mblighe2586682008-02-29 22:45:46 +0000146def remove_file_or_dir(path):
jadmanski0afbb632008-06-06 21:10:57 +0000147 if stat.S_ISDIR(os.stat(path).st_mode):
148 # directory
149 shutil.rmtree(path)
150 else:
151 # file
152 os.remove(path)
mblighe2586682008-02-29 22:45:46 +0000153
154
mbligh6f8bab42008-02-29 22:45:14 +0000155class DatabaseConn:
jadmanski0afbb632008-06-06 21:10:57 +0000156 def __init__(self):
157 self.reconnect_wait = 20
158 self.conn = None
159 self.cur = None
mbligh36768f02008-02-22 18:28:33 +0000160
jadmanski0afbb632008-06-06 21:10:57 +0000161 import MySQLdb.converters
162 self.convert_dict = MySQLdb.converters.conversions
163 self.convert_dict.setdefault(bool, self.convert_boolean)
mbligh4eb2df22008-03-13 15:39:29 +0000164
mbligh36768f02008-02-22 18:28:33 +0000165
jadmanski0afbb632008-06-06 21:10:57 +0000166 @staticmethod
167 def convert_boolean(boolean, conversion_dict):
168 'Convert booleans to integer strings'
169 return str(int(boolean))
mbligh4eb2df22008-03-13 15:39:29 +0000170
171
jadmanski0afbb632008-06-06 21:10:57 +0000172 def connect(self, db_name=None):
173 self.disconnect()
mbligh6f8bab42008-02-29 22:45:14 +0000174
jadmanski0afbb632008-06-06 21:10:57 +0000175 # get global config and parse for info
176 c = global_config.global_config
177 dbase = "AUTOTEST_WEB"
178 db_host = c.get_config_value(dbase, "host")
179 if db_name is None:
180 db_name = c.get_config_value(dbase, "database")
mbligh6f8bab42008-02-29 22:45:14 +0000181
jadmanski0afbb632008-06-06 21:10:57 +0000182 if _testing_mode:
183 db_name = 'stresstest_autotest_web'
showard30eed1f2008-05-27 22:31:58 +0000184
jadmanski0afbb632008-06-06 21:10:57 +0000185 db_user = c.get_config_value(dbase, "user")
186 db_pass = c.get_config_value(dbase, "password")
mbligh6f8bab42008-02-29 22:45:14 +0000187
jadmanski0afbb632008-06-06 21:10:57 +0000188 while not self.conn:
189 try:
190 self.conn = MySQLdb.connect(
191 host=db_host, user=db_user, passwd=db_pass,
192 db=db_name, conv=self.convert_dict)
mbligh6f8bab42008-02-29 22:45:14 +0000193
jadmanski0afbb632008-06-06 21:10:57 +0000194 self.conn.autocommit(True)
195 self.cur = self.conn.cursor()
196 except MySQLdb.OperationalError:
197 traceback.print_exc()
198 print "Can't connect to MYSQL; reconnecting"
199 time.sleep(self.reconnect_wait)
200 self.disconnect()
mbligh6f8bab42008-02-29 22:45:14 +0000201
202
jadmanski0afbb632008-06-06 21:10:57 +0000203 def disconnect(self):
204 if self.conn:
205 self.conn.close()
206 self.conn = None
207 self.cur = None
mbligh6f8bab42008-02-29 22:45:14 +0000208
209
jadmanski0afbb632008-06-06 21:10:57 +0000210 def execute(self, *args, **dargs):
211 while (True):
212 try:
213 self.cur.execute(*args, **dargs)
214 return self.cur.fetchall()
215 except MySQLdb.OperationalError:
216 traceback.print_exc()
217 print "MYSQL connection died; reconnecting"
218 time.sleep(self.reconnect_wait)
219 self.connect()
mbligh36768f02008-02-22 18:28:33 +0000220
221
mblighdbdac6c2008-03-05 15:49:58 +0000222def generate_parse_command(results_dir, flags=""):
jadmanski0afbb632008-06-06 21:10:57 +0000223 parse = os.path.abspath(os.path.join(AUTOTEST_TKO_DIR, 'parse'))
224 output = os.path.abspath(os.path.join(results_dir, '.parse.log'))
225 cmd = "%s %s -r -o %s > %s 2>&1 &"
226 return cmd % (parse, flags, results_dir, output)
mblighdbdac6c2008-03-05 15:49:58 +0000227
228
showard970a6db2008-09-03 20:02:39 +0000229_parse_command_queue = []
mbligh36768f02008-02-22 18:28:33 +0000230def parse_results(results_dir, flags=""):
jadmanski0afbb632008-06-06 21:10:57 +0000231 if _testing_mode:
232 return
showard970a6db2008-09-03 20:02:39 +0000233 _parse_command_queue.append(generate_parse_command(results_dir, flags))
mbligh36768f02008-02-22 18:28:33 +0000234
235
mblighbb421852008-03-11 22:36:16 +0000236
237
mbligh36768f02008-02-22 18:28:33 +0000238def log_stacktrace(reason):
jadmanski0afbb632008-06-06 21:10:57 +0000239 (type, value, tb) = sys.exc_info()
240 str = "EXCEPTION: %s\n" % reason
241 str += ''.join(traceback.format_exception(type, value, tb))
mbligh36768f02008-02-22 18:28:33 +0000242
jadmanski0afbb632008-06-06 21:10:57 +0000243 sys.stderr.write("\n%s\n" % str)
244 email_manager.enqueue_notify_email("monitor_db exception", str)
mbligh36768f02008-02-22 18:28:33 +0000245
mblighbb421852008-03-11 22:36:16 +0000246
247def get_proc_poll_fn(pid):
jadmanski0afbb632008-06-06 21:10:57 +0000248 proc_path = os.path.join('/proc', str(pid))
249 def poll_fn():
250 if os.path.exists(proc_path):
251 return None
252 return 0 # we can't get a real exit code
253 return poll_fn
mblighbb421852008-03-11 22:36:16 +0000254
255
showard542e8402008-09-19 20:16:18 +0000256def send_email(from_addr, to_string, subject, body):
257 """Mails out emails to the addresses listed in to_string.
258
259 to_string is split into a list which can be delimited by any of:
260 ';', ',', ':' or any whitespace
261 """
262
263 # Create list from string removing empty strings from the list.
264 to_list = [x for x in re.split('\s|,|;|:', to_string) if x]
showard7d182aa2008-09-22 16:17:24 +0000265 if not to_list:
266 return
267
showard542e8402008-09-19 20:16:18 +0000268 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (
269 from_addr, ', '.join(to_list), subject, body)
showard7d182aa2008-09-22 16:17:24 +0000270 try:
271 mailer = smtplib.SMTP('localhost')
272 try:
273 mailer.sendmail(from_addr, to_list, msg)
274 finally:
275 mailer.quit()
276 except Exception, e:
277 print "Sending email failed. Reason: %s" % repr(e)
showard542e8402008-09-19 20:16:18 +0000278
279
mblighbb421852008-03-11 22:36:16 +0000280def kill_autoserv(pid, poll_fn=None):
jadmanski0afbb632008-06-06 21:10:57 +0000281 print 'killing', pid
282 if poll_fn is None:
283 poll_fn = get_proc_poll_fn(pid)
284 if poll_fn() == None:
285 os.kill(pid, signal.SIGCONT)
286 os.kill(pid, signal.SIGTERM)
mbligh36768f02008-02-22 18:28:33 +0000287
288
showard7cf9a9b2008-05-15 21:15:52 +0000289class EmailNotificationManager(object):
jadmanski0afbb632008-06-06 21:10:57 +0000290 def __init__(self):
291 self._emails = []
showard7cf9a9b2008-05-15 21:15:52 +0000292
jadmanski0afbb632008-06-06 21:10:57 +0000293 def enqueue_notify_email(self, subject, message):
294 if not _notify_email:
295 return
showard7cf9a9b2008-05-15 21:15:52 +0000296
jadmanski0afbb632008-06-06 21:10:57 +0000297 body = 'Subject: ' + subject + '\n'
298 body += "%s / %s / %s\n%s" % (socket.gethostname(),
299 os.getpid(),
300 time.strftime("%X %x"), message)
301 self._emails.append(body)
showard7cf9a9b2008-05-15 21:15:52 +0000302
303
jadmanski0afbb632008-06-06 21:10:57 +0000304 def send_queued_emails(self):
305 if not self._emails:
306 return
307 subject = 'Scheduler notifications from ' + socket.gethostname()
308 separator = '\n' + '-' * 40 + '\n'
309 body = separator.join(self._emails)
showard7cf9a9b2008-05-15 21:15:52 +0000310
showard542e8402008-09-19 20:16:18 +0000311 send_email(_email_from, _notify_email, subject, body)
jadmanski0afbb632008-06-06 21:10:57 +0000312 self._emails = []
showard7cf9a9b2008-05-15 21:15:52 +0000313
314email_manager = EmailNotificationManager()
315
316
showard63a34772008-08-18 19:32:50 +0000317class HostScheduler(object):
318 def _get_ready_hosts(self):
319 # avoid any host with a currently active queue entry against it
320 hosts = Host.fetch(
321 joins='LEFT JOIN host_queue_entries AS active_hqe '
322 'ON (hosts.id = active_hqe.host_id AND '
323 'active_hqe.active = TRUE)',
324 where="active_hqe.host_id IS NULL "
325 "AND hosts.locked = FALSE "
326 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
327 return dict((host.id, host) for host in hosts)
328
329
330 @staticmethod
331 def _get_sql_id_list(id_list):
332 return ','.join(str(item_id) for item_id in id_list)
333
334
335 @classmethod
showard989f25d2008-10-01 11:38:11 +0000336 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000337 if not id_list:
338 return {}
showard63a34772008-08-18 19:32:50 +0000339 query %= cls._get_sql_id_list(id_list)
340 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000341 return cls._process_many2many_dict(rows, flip)
342
343
344 @staticmethod
345 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000346 result = {}
347 for row in rows:
348 left_id, right_id = long(row[0]), long(row[1])
showard989f25d2008-10-01 11:38:11 +0000349 if flip:
350 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000351 result.setdefault(left_id, set()).add(right_id)
352 return result
353
354
355 @classmethod
356 def _get_job_acl_groups(cls, job_ids):
357 query = """
358 SELECT jobs.id, acl_groups_users.acl_group_id
359 FROM jobs
360 INNER JOIN users ON users.login = jobs.owner
361 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
362 WHERE jobs.id IN (%s)
363 """
364 return cls._get_many2many_dict(query, job_ids)
365
366
367 @classmethod
368 def _get_job_ineligible_hosts(cls, job_ids):
369 query = """
370 SELECT job_id, host_id
371 FROM ineligible_host_queues
372 WHERE job_id IN (%s)
373 """
374 return cls._get_many2many_dict(query, job_ids)
375
376
377 @classmethod
showard989f25d2008-10-01 11:38:11 +0000378 def _get_job_dependencies(cls, job_ids):
379 query = """
380 SELECT job_id, label_id
381 FROM jobs_dependency_labels
382 WHERE job_id IN (%s)
383 """
384 return cls._get_many2many_dict(query, job_ids)
385
386
387 @classmethod
showard63a34772008-08-18 19:32:50 +0000388 def _get_host_acls(cls, host_ids):
389 query = """
390 SELECT host_id, acl_group_id
391 FROM acl_groups_hosts
392 WHERE host_id IN (%s)
393 """
394 return cls._get_many2many_dict(query, host_ids)
395
396
397 @classmethod
398 def _get_label_hosts(cls, host_ids):
399 query = """
400 SELECT label_id, host_id
401 FROM hosts_labels
402 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000403 """ % cls._get_sql_id_list(host_ids)
404 rows = _db.execute(query)
405 labels_to_hosts = cls._process_many2many_dict(rows)
406 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
407 return labels_to_hosts, hosts_to_labels
408
409
410 @classmethod
411 def _get_labels(cls):
412 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000413
414
415 def refresh(self, pending_queue_entries):
416 self._hosts_available = self._get_ready_hosts()
417
418 relevant_jobs = [queue_entry.job_id
419 for queue_entry in pending_queue_entries]
420 self._job_acls = self._get_job_acl_groups(relevant_jobs)
421 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000422 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000423
424 host_ids = self._hosts_available.keys()
425 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000426 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
427
428 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000429
430
431 def _is_acl_accessible(self, host_id, queue_entry):
432 job_acls = self._job_acls.get(queue_entry.job_id, set())
433 host_acls = self._host_acls.get(host_id, set())
434 return len(host_acls.intersection(job_acls)) > 0
435
436
showard989f25d2008-10-01 11:38:11 +0000437 def _check_job_dependencies(self, job_dependencies, host_labels):
438 missing = job_dependencies - host_labels
439 return len(job_dependencies - host_labels) == 0
440
441
442 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
443 queue_entry):
444 for label_id in host_labels:
445 label = self._labels[label_id]
446 if not label.only_if_needed:
447 # we don't care about non-only_if_needed labels
448 continue
449 if queue_entry.meta_host == label_id:
450 # if the label was requested in a metahost it's OK
451 continue
452 if label_id not in job_dependencies:
453 return False
454 return True
455
456
457 def _is_host_eligible_for_job(self, host_id, queue_entry):
458 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
459 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000460
461 acl = self._is_acl_accessible(host_id, queue_entry)
462 deps = self._check_job_dependencies(job_dependencies, host_labels)
463 only_if = self._check_only_if_needed_labels(job_dependencies,
464 host_labels, queue_entry)
465 return acl and deps and only_if
showard989f25d2008-10-01 11:38:11 +0000466
467
showard63a34772008-08-18 19:32:50 +0000468 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000469 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000470 return None
471 return self._hosts_available.pop(queue_entry.host_id, None)
472
473
474 def _is_host_usable(self, host_id):
475 if host_id not in self._hosts_available:
476 # host was already used during this scheduling cycle
477 return False
478 if self._hosts_available[host_id].invalid:
479 # Invalid hosts cannot be used for metahosts. They're included in
480 # the original query because they can be used by non-metahosts.
481 return False
482 return True
483
484
485 def _schedule_metahost(self, queue_entry):
486 label_id = queue_entry.meta_host
487 hosts_in_label = self._label_hosts.get(label_id, set())
488 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
489 set())
490
491 # must iterate over a copy so we can mutate the original while iterating
492 for host_id in list(hosts_in_label):
493 if not self._is_host_usable(host_id):
494 hosts_in_label.remove(host_id)
495 continue
496 if host_id in ineligible_host_ids:
497 continue
showard989f25d2008-10-01 11:38:11 +0000498 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000499 continue
500
501 hosts_in_label.remove(host_id)
502 return self._hosts_available.pop(host_id)
503 return None
504
505
506 def find_eligible_host(self, queue_entry):
507 if not queue_entry.meta_host:
508 return self._schedule_non_metahost(queue_entry)
509 return self._schedule_metahost(queue_entry)
510
511
mbligh36768f02008-02-22 18:28:33 +0000512class Dispatcher:
jadmanski0afbb632008-06-06 21:10:57 +0000513 autoserv_procs_cache = None
showard4c5374f2008-09-04 17:02:56 +0000514 max_running_processes = global_config.global_config.get_config_value(
jadmanski0afbb632008-06-06 21:10:57 +0000515 _global_config_section, 'max_running_jobs', type=int)
showard4c5374f2008-09-04 17:02:56 +0000516 max_processes_started_per_cycle = (
jadmanski0afbb632008-06-06 21:10:57 +0000517 global_config.global_config.get_config_value(
518 _global_config_section, 'max_jobs_started_per_cycle', type=int))
showard3bb499f2008-07-03 19:42:20 +0000519 clean_interval = (
520 global_config.global_config.get_config_value(
521 _global_config_section, 'clean_interval_minutes', type=int))
showard970a6db2008-09-03 20:02:39 +0000522 max_parse_processes = (
523 global_config.global_config.get_config_value(
524 _global_config_section, 'max_parse_processes', type=int))
mbligh90a549d2008-03-25 23:52:34 +0000525
jadmanski0afbb632008-06-06 21:10:57 +0000526 def __init__(self):
527 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000528 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000529 self._host_scheduler = HostScheduler()
mbligh36768f02008-02-22 18:28:33 +0000530
mbligh36768f02008-02-22 18:28:33 +0000531
jadmanski0afbb632008-06-06 21:10:57 +0000532 def do_initial_recovery(self, recover_hosts=True):
533 # always recover processes
534 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000535
jadmanski0afbb632008-06-06 21:10:57 +0000536 if recover_hosts:
537 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000538
539
jadmanski0afbb632008-06-06 21:10:57 +0000540 def tick(self):
541 Dispatcher.autoserv_procs_cache = None
showard3bb499f2008-07-03 19:42:20 +0000542 if self._last_clean_time + self.clean_interval * 60 < time.time():
543 self._abort_timed_out_jobs()
544 self._clear_inactive_blocks()
545 self._last_clean_time = time.time()
jadmanski0afbb632008-06-06 21:10:57 +0000546 self._find_aborting()
547 self._schedule_new_jobs()
548 self._handle_agents()
showard970a6db2008-09-03 20:02:39 +0000549 self._run_final_parses()
jadmanski0afbb632008-06-06 21:10:57 +0000550 email_manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000551
552
showard970a6db2008-09-03 20:02:39 +0000553 def _run_final_parses(self):
554 process_count = 0
555 try:
556 for line in utils.system_output('ps -e').splitlines():
557 if 'parse.py' in line:
558 process_count += 1
559 except Exception:
560 # We'll try again in a bit. This is a work-around for one time
561 # when the scheduler crashed due to a "Interrupted system call"
562 return
563
564 if process_count:
565 print "%d parses currently running" % process_count
566
567 while (process_count < self.max_parse_processes and
568 _parse_command_queue):
569 cmd = _parse_command_queue.pop(0)
570 print "Starting another final parse with cmd %s" % cmd
571 os.system(cmd)
572 process_count += 1
573
574 if _parse_command_queue:
575 print ("%d cmds still in final parse queue" %
576 len(_parse_command_queue))
577
578
jadmanski0afbb632008-06-06 21:10:57 +0000579 def add_agent(self, agent):
580 self._agents.append(agent)
581 agent.dispatcher = self
mblighd5c95802008-03-05 00:33:46 +0000582
jadmanski0afbb632008-06-06 21:10:57 +0000583 # Find agent corresponding to the specified queue_entry
584 def get_agents(self, queue_entry):
585 res_agents = []
586 for agent in self._agents:
587 if queue_entry.id in agent.queue_entry_ids:
588 res_agents.append(agent)
589 return res_agents
mbligh36768f02008-02-22 18:28:33 +0000590
591
jadmanski0afbb632008-06-06 21:10:57 +0000592 def remove_agent(self, agent):
593 self._agents.remove(agent)
showardec113162008-05-08 00:52:49 +0000594
595
showard4c5374f2008-09-04 17:02:56 +0000596 def num_running_processes(self):
597 return sum(agent.num_processes for agent in self._agents
598 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000599
600
jadmanski0afbb632008-06-06 21:10:57 +0000601 @classmethod
602 def find_autoservs(cls, orphans_only=False):
603 """\
604 Returns a dict mapping pids to command lines for root autoserv
605 processes. If orphans_only=True, return only processes that
606 have been orphaned (i.e. parent pid = 1).
607 """
608 if cls.autoserv_procs_cache is not None:
609 return cls.autoserv_procs_cache
610
611 proc = subprocess.Popen(
612 ['/bin/ps', 'x', '-o', 'pid,pgid,ppid,comm,args'],
613 stdout=subprocess.PIPE)
614 # split each line into the four columns output by ps
615 procs = [line.split(None, 4) for line in
616 proc.communicate()[0].splitlines()]
617 autoserv_procs = {}
618 for proc in procs:
619 # check ppid == 1 for orphans
620 if orphans_only and proc[2] != 1:
621 continue
622 # only root autoserv processes have pgid == pid
623 if (proc[3] == 'autoserv' and # comm
624 proc[1] == proc[0]): # pgid == pid
625 # map pid to args
626 autoserv_procs[int(proc[0])] = proc[4]
627 cls.autoserv_procs_cache = autoserv_procs
628 return autoserv_procs
mblighbb421852008-03-11 22:36:16 +0000629
630
jadmanski0afbb632008-06-06 21:10:57 +0000631 def recover_queue_entry(self, queue_entry, run_monitor):
632 job = queue_entry.job
633 if job.is_synchronous():
634 all_queue_entries = job.get_host_queue_entries()
635 else:
636 all_queue_entries = [queue_entry]
637 all_queue_entry_ids = [queue_entry.id for queue_entry
638 in all_queue_entries]
639 queue_task = RecoveryQueueTask(
640 job=queue_entry.job,
641 queue_entries=all_queue_entries,
642 run_monitor=run_monitor)
643 self.add_agent(Agent(tasks=[queue_task],
644 queue_entry_ids=all_queue_entry_ids))
mblighbb421852008-03-11 22:36:16 +0000645
646
jadmanski0afbb632008-06-06 21:10:57 +0000647 def _recover_processes(self):
648 orphans = self.find_autoservs(orphans_only=True)
mblighbb421852008-03-11 22:36:16 +0000649
jadmanski0afbb632008-06-06 21:10:57 +0000650 # first, recover running queue entries
651 rows = _db.execute("""SELECT * FROM host_queue_entries
652 WHERE status = 'Running'""")
653 queue_entries = [HostQueueEntry(row=i) for i in rows]
654 requeue_entries = []
655 recovered_entry_ids = set()
656 for queue_entry in queue_entries:
657 run_monitor = PidfileRunMonitor(
658 queue_entry.results_dir())
jadmanski0afbb632008-06-06 21:10:57 +0000659 pid, exit_code = run_monitor.get_pidfile_info()
660 if pid is None:
661 # autoserv apparently never got run, so requeue
662 requeue_entries.append(queue_entry)
663 continue
664 if queue_entry.id in recovered_entry_ids:
665 # synchronous job we've already recovered
666 continue
667 print 'Recovering queue entry %d (pid %d)' % (
668 queue_entry.id, pid)
669 job = queue_entry.job
670 if job.is_synchronous():
671 for entry in job.get_host_queue_entries():
672 assert entry.active
673 recovered_entry_ids.add(entry.id)
674 self.recover_queue_entry(queue_entry,
675 run_monitor)
676 orphans.pop(pid, None)
mblighd5c95802008-03-05 00:33:46 +0000677
jadmanski0afbb632008-06-06 21:10:57 +0000678 # and requeue other active queue entries
679 rows = _db.execute("""SELECT * FROM host_queue_entries
680 WHERE active AND NOT complete
681 AND status != 'Running'
682 AND status != 'Pending'
683 AND status != 'Abort'
684 AND status != 'Aborting'""")
685 queue_entries = [HostQueueEntry(row=i) for i in rows]
686 for queue_entry in queue_entries + requeue_entries:
687 print 'Requeuing running QE %d' % queue_entry.id
688 queue_entry.clear_results_dir(dont_delete_files=True)
689 queue_entry.requeue()
mbligh90a549d2008-03-25 23:52:34 +0000690
691
jadmanski0afbb632008-06-06 21:10:57 +0000692 # now kill any remaining autoserv processes
693 for pid in orphans.keys():
694 print 'Killing orphan %d (%s)' % (pid, orphans[pid])
695 kill_autoserv(pid)
696
697 # recover aborting tasks
698 rebooting_host_ids = set()
699 rows = _db.execute("""SELECT * FROM host_queue_entries
700 WHERE status='Abort' or status='Aborting'""")
701 queue_entries = [HostQueueEntry(row=i) for i in rows]
702 for queue_entry in queue_entries:
703 print 'Recovering aborting QE %d' % queue_entry.id
704 queue_host = queue_entry.get_host()
705 reboot_task = RebootTask(queue_host)
706 verify_task = VerifyTask(host = queue_host)
707 self.add_agent(Agent(tasks=[reboot_task,
708 verify_task],
709 queue_entry_ids=[queue_entry.id]))
710 queue_entry.set_status('Aborted')
711 # Secure the host from being picked up
712 queue_host.set_status('Rebooting')
713 rebooting_host_ids.add(queue_host.id)
714
715 # reverify hosts that were in the middle of verify, repair or
716 # reboot
717 self._reverify_hosts_where("""(status = 'Repairing' OR
718 status = 'Verifying' OR
719 status = 'Rebooting')""",
720 exclude_ids=rebooting_host_ids)
721
722 # finally, recover "Running" hosts with no active queue entries,
723 # although this should never happen
724 message = ('Recovering running host %s - this probably '
725 'indicates a scheduler bug')
726 self._reverify_hosts_where("""status = 'Running' AND
727 id NOT IN (SELECT host_id
728 FROM host_queue_entries
729 WHERE active)""",
730 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000731
732
jadmanski0afbb632008-06-06 21:10:57 +0000733 def _reverify_hosts_where(self, where,
734 print_message='Reverifying host %s',
735 exclude_ids=set()):
736 rows = _db.execute('SELECT * FROM hosts WHERE locked = 0 AND '
737 'invalid = 0 AND ' + where)
738 hosts = [Host(row=i) for i in rows]
739 for host in hosts:
740 if host.id in exclude_ids:
741 continue
742 if print_message is not None:
743 print print_message % host.hostname
744 verify_task = VerifyTask(host = host)
745 self.add_agent(Agent(tasks = [verify_task]))
mbligh36768f02008-02-22 18:28:33 +0000746
747
jadmanski0afbb632008-06-06 21:10:57 +0000748 def _recover_hosts(self):
749 # recover "Repair Failed" hosts
750 message = 'Reverifying dead host %s'
751 self._reverify_hosts_where("status = 'Repair Failed'",
752 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000753
754
showard3bb499f2008-07-03 19:42:20 +0000755 def _abort_timed_out_jobs(self):
756 """
757 Aborts all jobs that have timed out and not completed
758 """
759 update = """
760 UPDATE host_queue_entries INNER JOIN jobs
761 ON host_queue_entries.job_id = jobs.id"""
mbligh7e26d622008-07-29 21:04:42 +0000762 timed_out = ' AND jobs.created_on + INTERVAL jobs.timeout HOUR < NOW()'
showard3bb499f2008-07-03 19:42:20 +0000763
764 _db.execute(update + """
765 SET host_queue_entries.status = 'Abort'
766 WHERE host_queue_entries.active IS TRUE""" + timed_out)
767
768 _db.execute(update + """
769 SET host_queue_entries.status = 'Aborted',
770 host_queue_entries.active = FALSE,
771 host_queue_entries.complete = TRUE
772 WHERE host_queue_entries.active IS FALSE
773 AND host_queue_entries.complete IS FALSE""" + timed_out)
774
775
jadmanski0afbb632008-06-06 21:10:57 +0000776 def _clear_inactive_blocks(self):
777 """
778 Clear out blocks for all completed jobs.
779 """
780 # this would be simpler using NOT IN (subquery), but MySQL
781 # treats all IN subqueries as dependent, so this optimizes much
782 # better
783 _db.execute("""
784 DELETE ihq FROM ineligible_host_queues ihq
showard4eaaf522008-06-06 22:28:07 +0000785 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +0000786 WHERE NOT complete) hqe
787 USING (job_id) WHERE hqe.job_id IS NULL""")
showard04c82c52008-05-29 19:38:12 +0000788
789
showardb95b1bd2008-08-15 18:11:04 +0000790 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000791 # prioritize by job priority, then non-metahost over metahost, then FIFO
792 return list(HostQueueEntry.fetch(
793 where='NOT complete AND NOT active',
794 order_by='priority DESC, meta_host, id'))
mbligh36768f02008-02-22 18:28:33 +0000795
796
jadmanski0afbb632008-06-06 21:10:57 +0000797 def _schedule_new_jobs(self):
798 print "finding work"
799
showard63a34772008-08-18 19:32:50 +0000800 queue_entries = self._get_pending_queue_entries()
801 if not queue_entries:
showardb95b1bd2008-08-15 18:11:04 +0000802 return
showardb95b1bd2008-08-15 18:11:04 +0000803
showard63a34772008-08-18 19:32:50 +0000804 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000805
showard63a34772008-08-18 19:32:50 +0000806 for queue_entry in queue_entries:
807 assigned_host = self._host_scheduler.find_eligible_host(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000808 if not assigned_host:
jadmanski0afbb632008-06-06 21:10:57 +0000809 continue
showardb95b1bd2008-08-15 18:11:04 +0000810 self._run_queue_entry(queue_entry, assigned_host)
811
812
813 def _run_queue_entry(self, queue_entry, host):
814 agent = queue_entry.run(assigned_host=host)
815 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000816
817
jadmanski0afbb632008-06-06 21:10:57 +0000818 def _find_aborting(self):
819 num_aborted = 0
820 # Find jobs that are aborting
821 for entry in queue_entries_to_abort():
822 agents_to_abort = self.get_agents(entry)
823 entry_host = entry.get_host()
824 reboot_task = RebootTask(entry_host)
825 verify_task = VerifyTask(host = entry_host)
826 tasks = [reboot_task, verify_task]
827 if agents_to_abort:
828 abort_task = AbortTask(entry, agents_to_abort)
showard56193bb2008-08-13 20:07:41 +0000829 for agent in agents_to_abort:
830 self.remove_agent(agent)
jadmanski0afbb632008-06-06 21:10:57 +0000831 tasks.insert(0, abort_task)
832 else:
833 entry.set_status('Aborted')
834 # just to make sure this host does not get
835 # taken away
836 entry_host.set_status('Rebooting')
837 self.add_agent(Agent(tasks=tasks,
838 queue_entry_ids = [entry.id]))
839 num_aborted += 1
840 if num_aborted >= 50:
841 break
842
843
showard4c5374f2008-09-04 17:02:56 +0000844 def _can_start_agent(self, agent, num_running_processes,
845 num_started_this_cycle, have_reached_limit):
846 # always allow zero-process agents to run
847 if agent.num_processes == 0:
848 return True
849 # don't allow any nonzero-process agents to run after we've reached a
850 # limit (this avoids starvation of many-process agents)
851 if have_reached_limit:
852 return False
853 # total process throttling
854 if (num_running_processes + agent.num_processes >
855 self.max_running_processes):
856 return False
857 # if a single agent exceeds the per-cycle throttling, still allow it to
858 # run when it's the first agent in the cycle
859 if num_started_this_cycle == 0:
860 return True
861 # per-cycle throttling
862 if (num_started_this_cycle + agent.num_processes >
863 self.max_processes_started_per_cycle):
864 return False
865 return True
866
867
jadmanski0afbb632008-06-06 21:10:57 +0000868 def _handle_agents(self):
showard4c5374f2008-09-04 17:02:56 +0000869 num_running_processes = self.num_running_processes()
jadmanski0afbb632008-06-06 21:10:57 +0000870 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000871 have_reached_limit = False
872 # iterate over copy, so we can remove agents during iteration
873 for agent in list(self._agents):
874 if agent.is_done():
jadmanski0afbb632008-06-06 21:10:57 +0000875 print "agent finished"
showard4c5374f2008-09-04 17:02:56 +0000876 self._agents.remove(agent)
877 num_running_processes -= agent.num_processes
878 continue
879 if not agent.is_running():
880 if not self._can_start_agent(agent, num_running_processes,
881 num_started_this_cycle,
882 have_reached_limit):
883 have_reached_limit = True
884 continue
885 num_running_processes += agent.num_processes
886 num_started_this_cycle += agent.num_processes
887 agent.tick()
888 print num_running_processes, 'running processes'
mbligh36768f02008-02-22 18:28:33 +0000889
890
891class RunMonitor(object):
jadmanski0afbb632008-06-06 21:10:57 +0000892 def __init__(self, cmd, nice_level = None, log_file = None):
893 self.nice_level = nice_level
894 self.log_file = log_file
895 self.cmd = cmd
mbligh36768f02008-02-22 18:28:33 +0000896
jadmanski0afbb632008-06-06 21:10:57 +0000897 def run(self):
898 if self.nice_level:
899 nice_cmd = ['nice','-n', str(self.nice_level)]
900 nice_cmd.extend(self.cmd)
901 self.cmd = nice_cmd
mbligh36768f02008-02-22 18:28:33 +0000902
jadmanski0afbb632008-06-06 21:10:57 +0000903 out_file = None
904 if self.log_file:
905 try:
906 os.makedirs(os.path.dirname(self.log_file))
907 except OSError, exc:
908 if exc.errno != errno.EEXIST:
909 log_stacktrace(
910 'Unexpected error creating logfile '
911 'directory for %s' % self.log_file)
912 try:
913 out_file = open(self.log_file, 'a')
914 out_file.write("\n%s\n" % ('*'*80))
915 out_file.write("%s> %s\n" %
916 (time.strftime("%X %x"),
917 self.cmd))
918 out_file.write("%s\n" % ('*'*80))
919 except (OSError, IOError):
920 log_stacktrace('Error opening log file %s' %
921 self.log_file)
mblighcadb3532008-04-15 17:46:26 +0000922
jadmanski0afbb632008-06-06 21:10:57 +0000923 if not out_file:
924 out_file = open('/dev/null', 'w')
mblighcadb3532008-04-15 17:46:26 +0000925
jadmanski0afbb632008-06-06 21:10:57 +0000926 in_devnull = open('/dev/null', 'r')
927 print "cmd = %s" % self.cmd
928 print "path = %s" % os.getcwd()
mbligh36768f02008-02-22 18:28:33 +0000929
jadmanski0afbb632008-06-06 21:10:57 +0000930 self.proc = subprocess.Popen(self.cmd, stdout=out_file,
931 stderr=subprocess.STDOUT,
932 stdin=in_devnull)
933 out_file.close()
934 in_devnull.close()
mbligh36768f02008-02-22 18:28:33 +0000935
936
jadmanski0afbb632008-06-06 21:10:57 +0000937 def get_pid(self):
938 return self.proc.pid
mblighbb421852008-03-11 22:36:16 +0000939
940
jadmanski0afbb632008-06-06 21:10:57 +0000941 def kill(self):
942 kill_autoserv(self.get_pid(), self.exit_code)
mblighbb421852008-03-11 22:36:16 +0000943
mbligh36768f02008-02-22 18:28:33 +0000944
jadmanski0afbb632008-06-06 21:10:57 +0000945 def exit_code(self):
946 return self.proc.poll()
mbligh36768f02008-02-22 18:28:33 +0000947
948
mblighbb421852008-03-11 22:36:16 +0000949class PidfileException(Exception):
jadmanski0afbb632008-06-06 21:10:57 +0000950 """\
951 Raised when there's some unexpected behavior with the pid file.
952 """
mblighbb421852008-03-11 22:36:16 +0000953
954
955class PidfileRunMonitor(RunMonitor):
jadmanski0afbb632008-06-06 21:10:57 +0000956 def __init__(self, results_dir, cmd=None, nice_level=None,
957 log_file=None):
958 self.results_dir = os.path.abspath(results_dir)
959 self.pid_file = os.path.join(results_dir, AUTOSERV_PID_FILE)
960 self.lost_process = False
961 self.start_time = time.time()
showardb376bc52008-06-13 20:48:45 +0000962 super(PidfileRunMonitor, self).__init__(cmd, nice_level, log_file)
mblighbb421852008-03-11 22:36:16 +0000963
964
jadmanski0afbb632008-06-06 21:10:57 +0000965 def get_pid(self):
966 pid, exit_status = self.get_pidfile_info()
967 assert pid is not None
968 return pid
mblighbb421852008-03-11 22:36:16 +0000969
970
jadmanski0afbb632008-06-06 21:10:57 +0000971 def _check_command_line(self, command_line, spacer=' ',
972 print_error=False):
973 results_dir_arg = spacer.join(('', '-r', self.results_dir, ''))
974 match = results_dir_arg in command_line
975 if print_error and not match:
976 print '%s not found in %s' % (repr(results_dir_arg),
977 repr(command_line))
978 return match
mbligh90a549d2008-03-25 23:52:34 +0000979
980
jadmanski0afbb632008-06-06 21:10:57 +0000981 def _check_proc_fs(self, pid):
982 cmdline_path = os.path.join('/proc', str(pid), 'cmdline')
983 try:
984 cmdline_file = open(cmdline_path, 'r')
985 cmdline = cmdline_file.read().strip()
986 cmdline_file.close()
987 except IOError:
988 return False
989 # /proc/.../cmdline has \x00 separating args
990 return self._check_command_line(cmdline, spacer='\x00',
991 print_error=True)
mblighbb421852008-03-11 22:36:16 +0000992
993
jadmanski0afbb632008-06-06 21:10:57 +0000994 def read_pidfile(self):
995 if not os.path.exists(self.pid_file):
996 return None, None
997 file_obj = open(self.pid_file, 'r')
998 lines = file_obj.readlines()
999 file_obj.close()
1000 assert 1 <= len(lines) <= 2
1001 try:
1002 pid = int(lines[0])
1003 exit_status = None
1004 if len(lines) == 2:
1005 exit_status = int(lines[1])
1006 except ValueError, exc:
1007 raise PidfileException('Corrupt pid file: ' +
1008 str(exc.args))
mblighbb421852008-03-11 22:36:16 +00001009
jadmanski0afbb632008-06-06 21:10:57 +00001010 return pid, exit_status
mblighbb421852008-03-11 22:36:16 +00001011
1012
jadmanski0afbb632008-06-06 21:10:57 +00001013 def _find_autoserv_proc(self):
1014 autoserv_procs = Dispatcher.find_autoservs()
1015 for pid, args in autoserv_procs.iteritems():
1016 if self._check_command_line(args):
1017 return pid, args
1018 return None, None
mbligh90a549d2008-03-25 23:52:34 +00001019
1020
jadmanski0afbb632008-06-06 21:10:57 +00001021 def get_pidfile_info(self):
1022 """\
1023 Returns:
1024 None, None if autoserv has not yet run
1025 pid, None if autoserv is running
1026 pid, exit_status if autoserv has completed
1027 """
1028 if self.lost_process:
1029 return self.pid, self.exit_status
mblighbb421852008-03-11 22:36:16 +00001030
jadmanski0afbb632008-06-06 21:10:57 +00001031 pid, exit_status = self.read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001032
jadmanski0afbb632008-06-06 21:10:57 +00001033 if pid is None:
1034 return self._handle_no_pid()
mbligh90a549d2008-03-25 23:52:34 +00001035
jadmanski0afbb632008-06-06 21:10:57 +00001036 if exit_status is None:
1037 # double check whether or not autoserv is running
1038 proc_running = self._check_proc_fs(pid)
1039 if proc_running:
1040 return pid, exit_status
mbligh90a549d2008-03-25 23:52:34 +00001041
jadmanski0afbb632008-06-06 21:10:57 +00001042 # pid but no process - maybe process *just* exited
1043 pid, exit_status = self.read_pidfile()
1044 if exit_status is None:
1045 # autoserv exited without writing an exit code
1046 # to the pidfile
1047 error = ('autoserv died without writing exit '
1048 'code')
1049 message = error + '\nPid: %s\nPidfile: %s' % (
1050 pid, self.pid_file)
1051 print message
1052 email_manager.enqueue_notify_email(error,
1053 message)
1054 self.on_lost_process(pid)
1055 return self.pid, self.exit_status
mblighbb421852008-03-11 22:36:16 +00001056
jadmanski0afbb632008-06-06 21:10:57 +00001057 return pid, exit_status
mblighbb421852008-03-11 22:36:16 +00001058
1059
jadmanski0afbb632008-06-06 21:10:57 +00001060 def _handle_no_pid(self):
1061 """\
1062 Called when no pidfile is found or no pid is in the pidfile.
1063 """
1064 # is autoserv running?
1065 pid, args = self._find_autoserv_proc()
1066 if pid is None:
1067 # no autoserv process running
1068 message = 'No pid found at ' + self.pid_file
1069 else:
1070 message = ("Process %d (%s) hasn't written pidfile %s" %
1071 (pid, args, self.pid_file))
mbligh90a549d2008-03-25 23:52:34 +00001072
jadmanski0afbb632008-06-06 21:10:57 +00001073 print message
1074 if time.time() - self.start_time > PIDFILE_TIMEOUT:
1075 email_manager.enqueue_notify_email(
1076 'Process has failed to write pidfile', message)
1077 if pid is not None:
1078 kill_autoserv(pid)
1079 else:
1080 pid = 0
1081 self.on_lost_process(pid)
1082 return self.pid, self.exit_status
mbligh90a549d2008-03-25 23:52:34 +00001083
jadmanski0afbb632008-06-06 21:10:57 +00001084 return None, None
mbligh90a549d2008-03-25 23:52:34 +00001085
1086
jadmanski0afbb632008-06-06 21:10:57 +00001087 def on_lost_process(self, pid):
1088 """\
1089 Called when autoserv has exited without writing an exit status,
1090 or we've timed out waiting for autoserv to write a pid to the
1091 pidfile. In either case, we just return failure and the caller
1092 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001093
jadmanski0afbb632008-06-06 21:10:57 +00001094 pid is unimportant here, as it shouldn't be used by anyone.
1095 """
1096 self.lost_process = True
1097 self.pid = pid
1098 self.exit_status = 1
mbligh90a549d2008-03-25 23:52:34 +00001099
1100
jadmanski0afbb632008-06-06 21:10:57 +00001101 def exit_code(self):
1102 pid, exit_code = self.get_pidfile_info()
1103 return exit_code
mblighbb421852008-03-11 22:36:16 +00001104
1105
mbligh36768f02008-02-22 18:28:33 +00001106class Agent(object):
showard4c5374f2008-09-04 17:02:56 +00001107 def __init__(self, tasks, queue_entry_ids=[], num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001108 self.active_task = None
1109 self.queue = Queue.Queue(0)
1110 self.dispatcher = None
1111 self.queue_entry_ids = queue_entry_ids
showard4c5374f2008-09-04 17:02:56 +00001112 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001113
1114 for task in tasks:
1115 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001116
1117
jadmanski0afbb632008-06-06 21:10:57 +00001118 def add_task(self, task):
1119 self.queue.put_nowait(task)
1120 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001121
1122
jadmanski0afbb632008-06-06 21:10:57 +00001123 def tick(self):
1124 print "agent tick"
1125 if self.active_task and not self.active_task.is_done():
1126 self.active_task.poll()
1127 else:
1128 self._next_task();
mbligh36768f02008-02-22 18:28:33 +00001129
1130
jadmanski0afbb632008-06-06 21:10:57 +00001131 def _next_task(self):
1132 print "agent picking task"
1133 if self.active_task:
1134 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +00001135
jadmanski0afbb632008-06-06 21:10:57 +00001136 if not self.active_task.success:
1137 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001138
jadmanski0afbb632008-06-06 21:10:57 +00001139 self.active_task = None
1140 if not self.is_done():
1141 self.active_task = self.queue.get_nowait()
1142 if self.active_task:
1143 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001144
1145
jadmanski0afbb632008-06-06 21:10:57 +00001146 def on_task_failure(self):
1147 self.queue = Queue.Queue(0)
1148 for task in self.active_task.failure_tasks:
1149 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +00001150
mblighe2586682008-02-29 22:45:46 +00001151
showard4c5374f2008-09-04 17:02:56 +00001152 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001153 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001154
1155
jadmanski0afbb632008-06-06 21:10:57 +00001156 def is_done(self):
1157 return self.active_task == None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001158
1159
jadmanski0afbb632008-06-06 21:10:57 +00001160 def start(self):
1161 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +00001162
jadmanski0afbb632008-06-06 21:10:57 +00001163 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001164
jadmanski0afbb632008-06-06 21:10:57 +00001165
mbligh36768f02008-02-22 18:28:33 +00001166class AgentTask(object):
jadmanski0afbb632008-06-06 21:10:57 +00001167 def __init__(self, cmd, failure_tasks = []):
1168 self.done = False
1169 self.failure_tasks = failure_tasks
1170 self.started = False
1171 self.cmd = cmd
1172 self.task = None
1173 self.agent = None
1174 self.monitor = None
1175 self.success = None
mbligh36768f02008-02-22 18:28:33 +00001176
1177
jadmanski0afbb632008-06-06 21:10:57 +00001178 def poll(self):
1179 print "poll"
1180 if self.monitor:
1181 self.tick(self.monitor.exit_code())
1182 else:
1183 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001184
1185
jadmanski0afbb632008-06-06 21:10:57 +00001186 def tick(self, exit_code):
1187 if exit_code==None:
1188 return
1189# print "exit_code was %d" % exit_code
1190 if exit_code == 0:
1191 success = True
1192 else:
1193 success = False
mbligh36768f02008-02-22 18:28:33 +00001194
jadmanski0afbb632008-06-06 21:10:57 +00001195 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001196
1197
jadmanski0afbb632008-06-06 21:10:57 +00001198 def is_done(self):
1199 return self.done
mbligh36768f02008-02-22 18:28:33 +00001200
1201
jadmanski0afbb632008-06-06 21:10:57 +00001202 def finished(self, success):
1203 self.done = True
1204 self.success = success
1205 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001206
1207
jadmanski0afbb632008-06-06 21:10:57 +00001208 def prolog(self):
1209 pass
mblighd64e5702008-04-04 21:39:28 +00001210
1211
jadmanski0afbb632008-06-06 21:10:57 +00001212 def create_temp_resultsdir(self, suffix=''):
1213 self.temp_results_dir = tempfile.mkdtemp(suffix=suffix)
mblighd64e5702008-04-04 21:39:28 +00001214
mbligh36768f02008-02-22 18:28:33 +00001215
jadmanski0afbb632008-06-06 21:10:57 +00001216 def cleanup(self):
1217 if (hasattr(self, 'temp_results_dir') and
1218 os.path.exists(self.temp_results_dir)):
1219 shutil.rmtree(self.temp_results_dir)
mbligh36768f02008-02-22 18:28:33 +00001220
1221
jadmanski0afbb632008-06-06 21:10:57 +00001222 def epilog(self):
1223 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001224
1225
jadmanski0afbb632008-06-06 21:10:57 +00001226 def start(self):
1227 assert self.agent
1228
1229 if not self.started:
1230 self.prolog()
1231 self.run()
1232
1233 self.started = True
1234
1235
1236 def abort(self):
1237 if self.monitor:
1238 self.monitor.kill()
1239 self.done = True
1240 self.cleanup()
1241
1242
1243 def run(self):
1244 if self.cmd:
1245 print "agent starting monitor"
1246 log_file = None
1247 if hasattr(self, 'host'):
1248 log_file = os.path.join(RESULTS_DIR, 'hosts',
1249 self.host.hostname)
1250 self.monitor = RunMonitor(
1251 self.cmd, nice_level = AUTOSERV_NICE_LEVEL,
1252 log_file = log_file)
1253 self.monitor.run()
mbligh36768f02008-02-22 18:28:33 +00001254
1255
1256class RepairTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001257 def __init__(self, host, fail_queue_entry=None):
1258 """\
1259 fail_queue_entry: queue entry to mark failed if this repair
1260 fails.
1261 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001262 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001263 # normalize the protection name
1264 protection = host_protections.Protection.get_attr_name(protection)
jadmanski0afbb632008-06-06 21:10:57 +00001265 self.create_temp_resultsdir('.repair')
1266 cmd = [_autoserv_path , '-R', '-m', host.hostname,
jadmanskifb7cfb12008-07-09 14:13:21 +00001267 '-r', self.temp_results_dir, '--host-protection', protection]
jadmanski0afbb632008-06-06 21:10:57 +00001268 self.host = host
1269 self.fail_queue_entry = fail_queue_entry
1270 super(RepairTask, self).__init__(cmd)
mblighe2586682008-02-29 22:45:46 +00001271
mbligh36768f02008-02-22 18:28:33 +00001272
jadmanski0afbb632008-06-06 21:10:57 +00001273 def prolog(self):
1274 print "repair_task starting"
1275 self.host.set_status('Repairing')
mbligh36768f02008-02-22 18:28:33 +00001276
1277
jadmanski0afbb632008-06-06 21:10:57 +00001278 def epilog(self):
1279 super(RepairTask, self).epilog()
1280 if self.success:
1281 self.host.set_status('Ready')
1282 else:
1283 self.host.set_status('Repair Failed')
1284 if self.fail_queue_entry:
1285 self.fail_queue_entry.handle_host_failure()
mbligh36768f02008-02-22 18:28:33 +00001286
1287
1288class VerifyTask(AgentTask):
showard3d9899a2008-07-31 02:11:58 +00001289 def __init__(self, queue_entry=None, host=None, run_verify=True):
jadmanski0afbb632008-06-06 21:10:57 +00001290 assert bool(queue_entry) != bool(host)
mbligh36768f02008-02-22 18:28:33 +00001291
jadmanski0afbb632008-06-06 21:10:57 +00001292 self.host = host or queue_entry.host
1293 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001294
jadmanski0afbb632008-06-06 21:10:57 +00001295 self.create_temp_resultsdir('.verify')
showard3d9899a2008-07-31 02:11:58 +00001296
1297 # TODO:
1298 # While it is rediculous to instantiate a verify task object
1299 # that doesnt actually run the verify task, this is hopefully a
1300 # temporary hack and will have a cleaner way to skip this
1301 # step later. (while ensuring that the original semantics don't change)
1302 if not run_verify:
1303 cmd = ["true"]
1304 else:
1305 cmd = [_autoserv_path,'-v','-m',self.host.hostname,
1306 '-r', self.temp_results_dir]
mbligh36768f02008-02-22 18:28:33 +00001307
jadmanski0afbb632008-06-06 21:10:57 +00001308 fail_queue_entry = None
1309 if queue_entry and not queue_entry.meta_host:
1310 fail_queue_entry = queue_entry
1311 failure_tasks = [RepairTask(self.host, fail_queue_entry)]
mblighe2586682008-02-29 22:45:46 +00001312
jadmanski0afbb632008-06-06 21:10:57 +00001313 super(VerifyTask, self).__init__(cmd,
1314 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001315
1316
jadmanski0afbb632008-06-06 21:10:57 +00001317 def prolog(self):
1318 print "starting verify on %s" % (self.host.hostname)
1319 if self.queue_entry:
1320 self.queue_entry.set_status('Verifying')
1321 self.queue_entry.clear_results_dir(
1322 self.queue_entry.verify_results_dir())
1323 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001324
1325
jadmanski0afbb632008-06-06 21:10:57 +00001326 def cleanup(self):
1327 if not os.path.exists(self.temp_results_dir):
1328 return
1329 if self.queue_entry and (self.success or
1330 not self.queue_entry.meta_host):
1331 self.move_results()
1332 super(VerifyTask, self).cleanup()
mblighd64e5702008-04-04 21:39:28 +00001333
1334
jadmanski0afbb632008-06-06 21:10:57 +00001335 def epilog(self):
1336 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001337
jadmanski0afbb632008-06-06 21:10:57 +00001338 if self.success:
1339 self.host.set_status('Ready')
1340 elif self.queue_entry:
1341 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +00001342
1343
jadmanski0afbb632008-06-06 21:10:57 +00001344 def move_results(self):
1345 assert self.queue_entry is not None
1346 target_dir = self.queue_entry.verify_results_dir()
1347 if not os.path.exists(target_dir):
1348 os.makedirs(target_dir)
1349 files = os.listdir(self.temp_results_dir)
1350 for filename in files:
1351 if filename == AUTOSERV_PID_FILE:
1352 continue
1353 self.force_move(os.path.join(self.temp_results_dir,
1354 filename),
1355 os.path.join(target_dir, filename))
mbligh36768f02008-02-22 18:28:33 +00001356
1357
jadmanski0afbb632008-06-06 21:10:57 +00001358 @staticmethod
1359 def force_move(source, dest):
1360 """\
1361 Replacement for shutil.move() that will delete the destination
1362 if it exists, even if it's a directory.
1363 """
1364 if os.path.exists(dest):
1365 print ('Warning: removing existing destination file ' +
1366 dest)
1367 remove_file_or_dir(dest)
1368 shutil.move(source, dest)
mblighe2586682008-02-29 22:45:46 +00001369
1370
mblighdffd6372008-02-29 22:47:33 +00001371class VerifySynchronousTask(VerifyTask):
showard3d9899a2008-07-31 02:11:58 +00001372 def __init__(self, queue_entry, run_verify=True):
1373 super(VerifySynchronousTask, self).__init__(queue_entry=queue_entry,
1374 run_verify=run_verify)
mblighdffd6372008-02-29 22:47:33 +00001375
1376
jadmanski0afbb632008-06-06 21:10:57 +00001377 def epilog(self):
1378 super(VerifySynchronousTask, self).epilog()
1379 if self.success:
1380 if self.queue_entry.job.num_complete() > 0:
1381 # some other entry failed verify, and we've
1382 # already been marked as stopped
1383 return
mblighdffd6372008-02-29 22:47:33 +00001384
jadmanski0afbb632008-06-06 21:10:57 +00001385 self.queue_entry.set_status('Pending')
1386 job = self.queue_entry.job
1387 if job.is_ready():
1388 agent = job.run(self.queue_entry)
1389 self.agent.dispatcher.add_agent(agent)
mblighe2586682008-02-29 22:45:46 +00001390
mbligh36768f02008-02-22 18:28:33 +00001391class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001392 def __init__(self, job, queue_entries, cmd):
1393 super(QueueTask, self).__init__(cmd)
1394 self.job = job
1395 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001396
1397
jadmanski0afbb632008-06-06 21:10:57 +00001398 @staticmethod
showardd8e548a2008-09-09 03:04:57 +00001399 def _write_keyval(keyval_dir, field, value, keyval_filename='keyval'):
1400 key_path = os.path.join(keyval_dir, keyval_filename)
jadmanski0afbb632008-06-06 21:10:57 +00001401 keyval_file = open(key_path, 'a')
showardd8e548a2008-09-09 03:04:57 +00001402 print >> keyval_file, '%s=%s' % (field, str(value))
jadmanski0afbb632008-06-06 21:10:57 +00001403 keyval_file.close()
mbligh36768f02008-02-22 18:28:33 +00001404
1405
showardd8e548a2008-09-09 03:04:57 +00001406 def _host_keyval_dir(self):
1407 return os.path.join(self.results_dir(), 'host_keyvals')
1408
1409
1410 def _write_host_keyval(self, host):
1411 labels = ','.join(host.labels())
1412 self._write_keyval(self._host_keyval_dir(), 'labels', labels,
1413 keyval_filename=host.hostname)
1414
1415 def _create_host_keyval_dir(self):
1416 directory = self._host_keyval_dir()
1417 if not os.path.exists(directory):
1418 os.makedirs(directory)
1419
1420
jadmanski0afbb632008-06-06 21:10:57 +00001421 def results_dir(self):
1422 return self.queue_entries[0].results_dir()
mblighbb421852008-03-11 22:36:16 +00001423
1424
jadmanski0afbb632008-06-06 21:10:57 +00001425 def run(self):
1426 """\
1427 Override AgentTask.run() so we can use a PidfileRunMonitor.
1428 """
1429 self.monitor = PidfileRunMonitor(self.results_dir(),
1430 cmd=self.cmd,
1431 nice_level=AUTOSERV_NICE_LEVEL)
1432 self.monitor.run()
mblighbb421852008-03-11 22:36:16 +00001433
1434
jadmanski0afbb632008-06-06 21:10:57 +00001435 def prolog(self):
1436 # write some job timestamps into the job keyval file
1437 queued = time.mktime(self.job.created_on.timetuple())
1438 started = time.time()
showardd8e548a2008-09-09 03:04:57 +00001439 self._write_keyval(self.results_dir(), "job_queued", int(queued))
1440 self._write_keyval(self.results_dir(), "job_started", int(started))
1441 self._create_host_keyval_dir()
jadmanski0afbb632008-06-06 21:10:57 +00001442 for queue_entry in self.queue_entries:
showardd8e548a2008-09-09 03:04:57 +00001443 self._write_host_keyval(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001444 print "starting queue_task on %s/%s" % (queue_entry.host.hostname, queue_entry.id)
1445 queue_entry.set_status('Running')
1446 queue_entry.host.set_status('Running')
1447 if (not self.job.is_synchronous() and
1448 self.job.num_machines() > 1):
1449 assert len(self.queue_entries) == 1
1450 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001451
1452
jadmanski0afbb632008-06-06 21:10:57 +00001453 def _finish_task(self):
1454 # write out the finished time into the results keyval
1455 finished = time.time()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001456 self._write_keyval(self.results_dir(), "job_finished", int(finished))
jadmanskic2ac77f2008-05-16 21:44:04 +00001457
jadmanski0afbb632008-06-06 21:10:57 +00001458 # parse the results of the job
1459 if self.job.is_synchronous() or self.job.num_machines() == 1:
1460 parse_results(self.job.results_dir())
1461 else:
1462 for queue_entry in self.queue_entries:
jadmanskif7fa2cc2008-10-01 14:13:23 +00001463 parse_results(queue_entry.results_dir(), flags="-l 2")
1464
1465
1466 def _log_abort(self):
1467 # build up sets of all the aborted_by and aborted_on values
1468 aborted_by, aborted_on = set(), set()
1469 for queue_entry in self.queue_entries:
1470 if queue_entry.aborted_by:
1471 aborted_by.add(queue_entry.aborted_by)
1472 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1473 aborted_on.add(t)
1474
1475 # extract some actual, unique aborted by value and write it out
1476 assert len(aborted_by) <= 1
1477 if len(aborted_by) == 1:
1478 results_dir = self.results_dir()
1479 self._write_keyval(results_dir, "aborted_by", aborted_by.pop())
1480 self._write_keyval(results_dir, "aborted_on", max(aborted_on))
jadmanskic2ac77f2008-05-16 21:44:04 +00001481
1482
jadmanski0afbb632008-06-06 21:10:57 +00001483 def abort(self):
1484 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001485 self._log_abort()
jadmanski0afbb632008-06-06 21:10:57 +00001486 self._finish_task()
jadmanskic2ac77f2008-05-16 21:44:04 +00001487
1488
jadmanski0afbb632008-06-06 21:10:57 +00001489 def epilog(self):
1490 super(QueueTask, self).epilog()
1491 if self.success:
1492 status = 'Completed'
1493 else:
1494 status = 'Failed'
mbligh36768f02008-02-22 18:28:33 +00001495
jadmanski0afbb632008-06-06 21:10:57 +00001496 for queue_entry in self.queue_entries:
1497 queue_entry.set_status(status)
1498 queue_entry.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001499
jadmanski0afbb632008-06-06 21:10:57 +00001500 self._finish_task()
mblighbb421852008-03-11 22:36:16 +00001501
jadmanski0afbb632008-06-06 21:10:57 +00001502 print "queue_task finished with %s/%s" % (status, self.success)
mbligh36768f02008-02-22 18:28:33 +00001503
1504
mblighbb421852008-03-11 22:36:16 +00001505class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001506 def __init__(self, job, queue_entries, run_monitor):
1507 super(RecoveryQueueTask, self).__init__(job,
1508 queue_entries, cmd=None)
1509 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001510
1511
jadmanski0afbb632008-06-06 21:10:57 +00001512 def run(self):
1513 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001514
1515
jadmanski0afbb632008-06-06 21:10:57 +00001516 def prolog(self):
1517 # recovering an existing process - don't do prolog
1518 pass
mblighbb421852008-03-11 22:36:16 +00001519
1520
mbligh36768f02008-02-22 18:28:33 +00001521class RebootTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001522 def __init__(self, host):
1523 global _autoserv_path
1524
1525 # Current implementation of autoserv requires control file
1526 # to be passed on reboot action request. TODO: remove when no
1527 # longer appropriate.
1528 self.create_temp_resultsdir('.reboot')
1529 self.cmd = [_autoserv_path, '-b', '-m', host.hostname,
1530 '-r', self.temp_results_dir, '/dev/null']
1531 self.host = host
1532 super(RebootTask, self).__init__(self.cmd,
1533 failure_tasks=[RepairTask(host)])
mbligh16c722d2008-03-05 00:58:44 +00001534
mblighd5c95802008-03-05 00:33:46 +00001535
jadmanski0afbb632008-06-06 21:10:57 +00001536 def prolog(self):
1537 print "starting reboot task for host: %s" % self.host.hostname
1538 self.host.set_status("Rebooting")
mblighd5c95802008-03-05 00:33:46 +00001539
mblighd5c95802008-03-05 00:33:46 +00001540
1541class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001542 def __init__(self, queue_entry, agents_to_abort):
1543 self.queue_entry = queue_entry
1544 self.agents_to_abort = agents_to_abort
jadmanski0afbb632008-06-06 21:10:57 +00001545 super(AbortTask, self).__init__('')
mbligh36768f02008-02-22 18:28:33 +00001546
1547
jadmanski0afbb632008-06-06 21:10:57 +00001548 def prolog(self):
1549 print "starting abort on host %s, job %s" % (
1550 self.queue_entry.host_id, self.queue_entry.job_id)
1551 self.queue_entry.set_status('Aborting')
mbligh36768f02008-02-22 18:28:33 +00001552
mblighd64e5702008-04-04 21:39:28 +00001553
jadmanski0afbb632008-06-06 21:10:57 +00001554 def epilog(self):
1555 super(AbortTask, self).epilog()
1556 self.queue_entry.set_status('Aborted')
1557 self.success = True
1558
1559
1560 def run(self):
1561 for agent in self.agents_to_abort:
1562 if (agent.active_task):
1563 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001564
1565
1566class DBObject(object):
jadmanski0afbb632008-06-06 21:10:57 +00001567 def __init__(self, id=None, row=None, new_record=False):
1568 assert (bool(id) != bool(row))
mbligh36768f02008-02-22 18:28:33 +00001569
jadmanski0afbb632008-06-06 21:10:57 +00001570 self.__table = self._get_table()
1571 fields = self._fields()
mbligh36768f02008-02-22 18:28:33 +00001572
jadmanski0afbb632008-06-06 21:10:57 +00001573 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001574
jadmanski0afbb632008-06-06 21:10:57 +00001575 if row is None:
1576 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1577 rows = _db.execute(sql, (id,))
1578 if len(rows) == 0:
1579 raise "row not found (table=%s, id=%s)" % \
1580 (self.__table, id)
1581 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001582
jadmanski0afbb632008-06-06 21:10:57 +00001583 assert len(row) == self.num_cols(), (
1584 "table = %s, row = %s/%d, fields = %s/%d" % (
1585 self.__table, row, len(row), fields, self.num_cols()))
mbligh36768f02008-02-22 18:28:33 +00001586
jadmanski0afbb632008-06-06 21:10:57 +00001587 self.__valid_fields = {}
1588 for i,value in enumerate(row):
1589 self.__dict__[fields[i]] = value
1590 self.__valid_fields[fields[i]] = True
mbligh36768f02008-02-22 18:28:33 +00001591
jadmanski0afbb632008-06-06 21:10:57 +00001592 del self.__valid_fields['id']
mbligh36768f02008-02-22 18:28:33 +00001593
mblighe2586682008-02-29 22:45:46 +00001594
jadmanski0afbb632008-06-06 21:10:57 +00001595 @classmethod
1596 def _get_table(cls):
1597 raise NotImplementedError('Subclasses must override this')
mblighe2586682008-02-29 22:45:46 +00001598
1599
jadmanski0afbb632008-06-06 21:10:57 +00001600 @classmethod
1601 def _fields(cls):
1602 raise NotImplementedError('Subclasses must override this')
showard04c82c52008-05-29 19:38:12 +00001603
1604
jadmanski0afbb632008-06-06 21:10:57 +00001605 @classmethod
1606 def num_cols(cls):
1607 return len(cls._fields())
showard04c82c52008-05-29 19:38:12 +00001608
1609
jadmanski0afbb632008-06-06 21:10:57 +00001610 def count(self, where, table = None):
1611 if not table:
1612 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001613
jadmanski0afbb632008-06-06 21:10:57 +00001614 rows = _db.execute("""
1615 SELECT count(*) FROM %s
1616 WHERE %s
1617 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001618
jadmanski0afbb632008-06-06 21:10:57 +00001619 assert len(rows) == 1
1620
1621 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001622
1623
mblighf8c624d2008-07-03 16:58:45 +00001624 def update_field(self, field, value, condition=''):
jadmanski0afbb632008-06-06 21:10:57 +00001625 assert self.__valid_fields[field]
mbligh36768f02008-02-22 18:28:33 +00001626
jadmanski0afbb632008-06-06 21:10:57 +00001627 if self.__dict__[field] == value:
1628 return
mbligh36768f02008-02-22 18:28:33 +00001629
mblighf8c624d2008-07-03 16:58:45 +00001630 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1631 if condition:
1632 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001633 _db.execute(query, (value, self.id))
1634
1635 self.__dict__[field] = value
mbligh36768f02008-02-22 18:28:33 +00001636
1637
jadmanski0afbb632008-06-06 21:10:57 +00001638 def save(self):
1639 if self.__new_record:
1640 keys = self._fields()[1:] # avoid id
1641 columns = ','.join([str(key) for key in keys])
1642 values = ['"%s"' % self.__dict__[key] for key in keys]
1643 values = ','.join(values)
1644 query = """INSERT INTO %s (%s) VALUES (%s)""" % \
1645 (self.__table, columns, values)
1646 _db.execute(query)
mbligh36768f02008-02-22 18:28:33 +00001647
1648
jadmanski0afbb632008-06-06 21:10:57 +00001649 def delete(self):
1650 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1651 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001652
1653
showard63a34772008-08-18 19:32:50 +00001654 @staticmethod
1655 def _prefix_with(string, prefix):
1656 if string:
1657 string = prefix + string
1658 return string
1659
1660
jadmanski0afbb632008-06-06 21:10:57 +00001661 @classmethod
showard989f25d2008-10-01 11:38:11 +00001662 def fetch(cls, where='', params=(), joins='', order_by=''):
showard63a34772008-08-18 19:32:50 +00001663 table = cls._get_table()
1664 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1665 where = cls._prefix_with(where, 'WHERE ')
1666 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
1667 '%(where)s %(order_by)s' % {'table' : cls._get_table(),
1668 'joins' : joins,
1669 'where' : where,
1670 'order_by' : order_by})
1671 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001672 for row in rows:
1673 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001674
mbligh36768f02008-02-22 18:28:33 +00001675
1676class IneligibleHostQueue(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001677 def __init__(self, id=None, row=None, new_record=None):
1678 super(IneligibleHostQueue, self).__init__(id=id, row=row,
1679 new_record=new_record)
mblighe2586682008-02-29 22:45:46 +00001680
1681
jadmanski0afbb632008-06-06 21:10:57 +00001682 @classmethod
1683 def _get_table(cls):
1684 return 'ineligible_host_queues'
mbligh36768f02008-02-22 18:28:33 +00001685
1686
jadmanski0afbb632008-06-06 21:10:57 +00001687 @classmethod
1688 def _fields(cls):
1689 return ['id', 'job_id', 'host_id']
showard04c82c52008-05-29 19:38:12 +00001690
1691
showard989f25d2008-10-01 11:38:11 +00001692class Label(DBObject):
1693 @classmethod
1694 def _get_table(cls):
1695 return 'labels'
1696
1697
1698 @classmethod
1699 def _fields(cls):
1700 return ['id', 'name', 'kernel_config', 'platform', 'invalid',
1701 'only_if_needed']
1702
1703
mbligh36768f02008-02-22 18:28:33 +00001704class Host(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001705 def __init__(self, id=None, row=None):
1706 super(Host, self).__init__(id=id, row=row)
mblighe2586682008-02-29 22:45:46 +00001707
1708
jadmanski0afbb632008-06-06 21:10:57 +00001709 @classmethod
1710 def _get_table(cls):
1711 return 'hosts'
mbligh36768f02008-02-22 18:28:33 +00001712
1713
jadmanski0afbb632008-06-06 21:10:57 +00001714 @classmethod
1715 def _fields(cls):
1716 return ['id', 'hostname', 'locked', 'synch_id','status',
showardfb2a7fa2008-07-17 17:04:12 +00001717 'invalid', 'protection', 'locked_by_id', 'lock_time']
showard04c82c52008-05-29 19:38:12 +00001718
1719
jadmanski0afbb632008-06-06 21:10:57 +00001720 def current_task(self):
1721 rows = _db.execute("""
1722 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1723 """, (self.id,))
1724
1725 if len(rows) == 0:
1726 return None
1727 else:
1728 assert len(rows) == 1
1729 results = rows[0];
mblighf8c624d2008-07-03 16:58:45 +00001730# print "current = %s" % results
jadmanski0afbb632008-06-06 21:10:57 +00001731 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00001732
1733
jadmanski0afbb632008-06-06 21:10:57 +00001734 def yield_work(self):
1735 print "%s yielding work" % self.hostname
1736 if self.current_task():
1737 self.current_task().requeue()
1738
1739 def set_status(self,status):
1740 print '%s -> %s' % (self.hostname, status)
1741 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00001742
1743
showardd8e548a2008-09-09 03:04:57 +00001744 def labels(self):
1745 """
1746 Fetch a list of names of all non-platform labels associated with this
1747 host.
1748 """
1749 rows = _db.execute("""
1750 SELECT labels.name
1751 FROM labels
1752 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
1753 WHERE NOT labels.platform AND hosts_labels.host_id = %s
1754 ORDER BY labels.name
1755 """, (self.id,))
1756 return [row[0] for row in rows]
1757
1758
mbligh36768f02008-02-22 18:28:33 +00001759class HostQueueEntry(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001760 def __init__(self, id=None, row=None):
1761 assert id or row
1762 super(HostQueueEntry, self).__init__(id=id, row=row)
1763 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00001764
jadmanski0afbb632008-06-06 21:10:57 +00001765 if self.host_id:
1766 self.host = Host(self.host_id)
1767 else:
1768 self.host = None
mbligh36768f02008-02-22 18:28:33 +00001769
jadmanski0afbb632008-06-06 21:10:57 +00001770 self.queue_log_path = os.path.join(self.job.results_dir(),
1771 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00001772
1773
jadmanski0afbb632008-06-06 21:10:57 +00001774 @classmethod
1775 def _get_table(cls):
1776 return 'host_queue_entries'
mblighe2586682008-02-29 22:45:46 +00001777
1778
jadmanski0afbb632008-06-06 21:10:57 +00001779 @classmethod
1780 def _fields(cls):
1781 return ['id', 'job_id', 'host_id', 'priority', 'status',
showardb8471e32008-07-03 19:51:08 +00001782 'meta_host', 'active', 'complete', 'deleted']
showard04c82c52008-05-29 19:38:12 +00001783
1784
jadmanski0afbb632008-06-06 21:10:57 +00001785 def set_host(self, host):
1786 if host:
1787 self.queue_log_record('Assigning host ' + host.hostname)
1788 self.update_field('host_id', host.id)
1789 self.update_field('active', True)
1790 self.block_host(host.id)
1791 else:
1792 self.queue_log_record('Releasing host')
1793 self.unblock_host(self.host.id)
1794 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00001795
jadmanski0afbb632008-06-06 21:10:57 +00001796 self.host = host
mbligh36768f02008-02-22 18:28:33 +00001797
1798
jadmanski0afbb632008-06-06 21:10:57 +00001799 def get_host(self):
1800 return self.host
mbligh36768f02008-02-22 18:28:33 +00001801
1802
jadmanski0afbb632008-06-06 21:10:57 +00001803 def queue_log_record(self, log_line):
1804 now = str(datetime.datetime.now())
1805 queue_log = open(self.queue_log_path, 'a', 0)
1806 queue_log.write(now + ' ' + log_line + '\n')
1807 queue_log.close()
mbligh36768f02008-02-22 18:28:33 +00001808
1809
jadmanski0afbb632008-06-06 21:10:57 +00001810 def block_host(self, host_id):
1811 print "creating block %s/%s" % (self.job.id, host_id)
1812 row = [0, self.job.id, host_id]
1813 block = IneligibleHostQueue(row=row, new_record=True)
1814 block.save()
mblighe2586682008-02-29 22:45:46 +00001815
1816
jadmanski0afbb632008-06-06 21:10:57 +00001817 def unblock_host(self, host_id):
1818 print "removing block %s/%s" % (self.job.id, host_id)
1819 blocks = IneligibleHostQueue.fetch(
1820 'job_id=%d and host_id=%d' % (self.job.id, host_id))
1821 for block in blocks:
1822 block.delete()
mblighe2586682008-02-29 22:45:46 +00001823
1824
jadmanski0afbb632008-06-06 21:10:57 +00001825 def results_dir(self):
1826 if self.job.is_synchronous() or self.job.num_machines() == 1:
1827 return self.job.job_dir
1828 else:
1829 assert self.host
1830 return os.path.join(self.job.job_dir,
1831 self.host.hostname)
mbligh36768f02008-02-22 18:28:33 +00001832
mblighe2586682008-02-29 22:45:46 +00001833
jadmanski0afbb632008-06-06 21:10:57 +00001834 def verify_results_dir(self):
1835 if self.job.is_synchronous() or self.job.num_machines() > 1:
1836 assert self.host
1837 return os.path.join(self.job.job_dir,
1838 self.host.hostname)
1839 else:
1840 return self.job.job_dir
mbligh36768f02008-02-22 18:28:33 +00001841
1842
jadmanski0afbb632008-06-06 21:10:57 +00001843 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00001844 abort_statuses = ['Abort', 'Aborting', 'Aborted']
1845 if status not in abort_statuses:
1846 condition = ' AND '.join(['status <> "%s"' % x
1847 for x in abort_statuses])
1848 else:
1849 condition = ''
1850 self.update_field('status', status, condition=condition)
1851
jadmanski0afbb632008-06-06 21:10:57 +00001852 if self.host:
1853 hostname = self.host.hostname
1854 else:
1855 hostname = 'no host'
1856 print "%s/%d status -> %s" % (hostname, self.id, self.status)
mblighf8c624d2008-07-03 16:58:45 +00001857
jadmanski0afbb632008-06-06 21:10:57 +00001858 if status in ['Queued']:
1859 self.update_field('complete', False)
1860 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00001861
jadmanski0afbb632008-06-06 21:10:57 +00001862 if status in ['Pending', 'Running', 'Verifying', 'Starting',
1863 'Abort', 'Aborting']:
1864 self.update_field('complete', False)
1865 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00001866
jadmanski0afbb632008-06-06 21:10:57 +00001867 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
1868 self.update_field('complete', True)
1869 self.update_field('active', False)
showard542e8402008-09-19 20:16:18 +00001870 self._email_on_job_complete()
1871
1872
1873 def _email_on_job_complete(self):
1874 url = "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
1875
1876 if self.job.is_finished():
1877 subject = "Autotest: Job ID: %s \"%s\" Completed" % (
1878 self.job.id, self.job.name)
1879 body = "Job ID: %s\nJob Name: %s\n%s\n" % (
1880 self.job.id, self.job.name, url)
1881 send_email(_email_from, self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00001882
1883
jadmanski0afbb632008-06-06 21:10:57 +00001884 def run(self,assigned_host=None):
1885 if self.meta_host:
1886 assert assigned_host
1887 # ensure results dir exists for the queue log
1888 self.job.create_results_dir()
1889 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00001890
jadmanski0afbb632008-06-06 21:10:57 +00001891 print "%s/%s scheduled on %s, status=%s" % (self.job.name,
1892 self.meta_host, self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00001893
jadmanski0afbb632008-06-06 21:10:57 +00001894 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00001895
jadmanski0afbb632008-06-06 21:10:57 +00001896 def requeue(self):
1897 self.set_status('Queued')
mblighe2586682008-02-29 22:45:46 +00001898
jadmanski0afbb632008-06-06 21:10:57 +00001899 if self.meta_host:
1900 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00001901
1902
jadmanski0afbb632008-06-06 21:10:57 +00001903 def handle_host_failure(self):
1904 """\
1905 Called when this queue entry's host has failed verification and
1906 repair.
1907 """
1908 assert not self.meta_host
1909 self.set_status('Failed')
1910 if self.job.is_synchronous():
1911 self.job.stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00001912
1913
jadmanski0afbb632008-06-06 21:10:57 +00001914 def clear_results_dir(self, results_dir=None, dont_delete_files=False):
1915 results_dir = results_dir or self.results_dir()
1916 if not os.path.exists(results_dir):
1917 return
1918 if dont_delete_files:
1919 temp_dir = tempfile.mkdtemp(suffix='.clear_results')
1920 print 'Moving results from %s to %s' % (results_dir,
1921 temp_dir)
1922 for filename in os.listdir(results_dir):
1923 path = os.path.join(results_dir, filename)
1924 if dont_delete_files:
1925 shutil.move(path,
1926 os.path.join(temp_dir, filename))
1927 else:
1928 remove_file_or_dir(path)
mbligh36768f02008-02-22 18:28:33 +00001929
1930
jadmanskif7fa2cc2008-10-01 14:13:23 +00001931 @property
1932 def aborted_by(self):
1933 self._load_abort_info()
1934 return self._aborted_by
1935
1936
1937 @property
1938 def aborted_on(self):
1939 self._load_abort_info()
1940 return self._aborted_on
1941
1942
1943 def _load_abort_info(self):
1944 """ Fetch info about who aborted the job. """
1945 if hasattr(self, "_aborted_by"):
1946 return
1947 rows = _db.execute("""
1948 SELECT users.login, aborted_host_queue_entries.aborted_on
1949 FROM aborted_host_queue_entries
1950 INNER JOIN users
1951 ON users.id = aborted_host_queue_entries.aborted_by_id
1952 WHERE aborted_host_queue_entries.queue_entry_id = %s
1953 """, (self.id,))
1954 if rows:
1955 self._aborted_by, self._aborted_on = rows[0]
1956 else:
1957 self._aborted_by = self._aborted_on = None
1958
1959
mbligh36768f02008-02-22 18:28:33 +00001960class Job(DBObject):
jadmanski0afbb632008-06-06 21:10:57 +00001961 def __init__(self, id=None, row=None):
1962 assert id or row
1963 super(Job, self).__init__(id=id, row=row)
mbligh36768f02008-02-22 18:28:33 +00001964
jadmanski0afbb632008-06-06 21:10:57 +00001965 self.job_dir = os.path.join(RESULTS_DIR, "%s-%s" % (self.id,
1966 self.owner))
mblighe2586682008-02-29 22:45:46 +00001967
1968
jadmanski0afbb632008-06-06 21:10:57 +00001969 @classmethod
1970 def _get_table(cls):
1971 return 'jobs'
mbligh36768f02008-02-22 18:28:33 +00001972
1973
jadmanski0afbb632008-06-06 21:10:57 +00001974 @classmethod
1975 def _fields(cls):
1976 return ['id', 'owner', 'name', 'priority', 'control_file',
1977 'control_type', 'created_on', 'synch_type',
showard542e8402008-09-19 20:16:18 +00001978 'synch_count', 'synchronizing', 'timeout',
1979 'run_verify', 'email_list']
showard04c82c52008-05-29 19:38:12 +00001980
1981
jadmanski0afbb632008-06-06 21:10:57 +00001982 def is_server_job(self):
1983 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00001984
1985
jadmanski0afbb632008-06-06 21:10:57 +00001986 def get_host_queue_entries(self):
1987 rows = _db.execute("""
1988 SELECT * FROM host_queue_entries
1989 WHERE job_id= %s
1990 """, (self.id,))
1991 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00001992
jadmanski0afbb632008-06-06 21:10:57 +00001993 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00001994
jadmanski0afbb632008-06-06 21:10:57 +00001995 return entries
mbligh36768f02008-02-22 18:28:33 +00001996
1997
jadmanski0afbb632008-06-06 21:10:57 +00001998 def set_status(self, status, update_queues=False):
1999 self.update_field('status',status)
2000
2001 if update_queues:
2002 for queue_entry in self.get_host_queue_entries():
2003 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002004
2005
jadmanski0afbb632008-06-06 21:10:57 +00002006 def is_synchronous(self):
2007 return self.synch_type == 2
mbligh36768f02008-02-22 18:28:33 +00002008
2009
jadmanski0afbb632008-06-06 21:10:57 +00002010 def is_ready(self):
2011 if not self.is_synchronous():
2012 return True
2013 sql = "job_id=%s AND status='Pending'" % self.id
2014 count = self.count(sql, table='host_queue_entries')
2015 return (count == self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002016
2017
jadmanski0afbb632008-06-06 21:10:57 +00002018 def ready_to_synchronize(self):
2019 # heuristic
2020 queue_entries = self.get_host_queue_entries()
2021 count = 0
2022 for queue_entry in queue_entries:
2023 if queue_entry.status == 'Pending':
2024 count += 1
mbligh36768f02008-02-22 18:28:33 +00002025
jadmanski0afbb632008-06-06 21:10:57 +00002026 return (count/self.synch_count >= 0.5)
mbligh36768f02008-02-22 18:28:33 +00002027
2028
jadmanski0afbb632008-06-06 21:10:57 +00002029 def start_synchronizing(self):
2030 self.update_field('synchronizing', True)
mbligh36768f02008-02-22 18:28:33 +00002031
2032
jadmanski0afbb632008-06-06 21:10:57 +00002033 def results_dir(self):
2034 return self.job_dir
mbligh36768f02008-02-22 18:28:33 +00002035
jadmanski0afbb632008-06-06 21:10:57 +00002036 def num_machines(self, clause = None):
2037 sql = "job_id=%s" % self.id
2038 if clause:
2039 sql += " AND (%s)" % clause
2040 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002041
2042
jadmanski0afbb632008-06-06 21:10:57 +00002043 def num_queued(self):
2044 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002045
2046
jadmanski0afbb632008-06-06 21:10:57 +00002047 def num_active(self):
2048 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002049
2050
jadmanski0afbb632008-06-06 21:10:57 +00002051 def num_complete(self):
2052 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002053
2054
jadmanski0afbb632008-06-06 21:10:57 +00002055 def is_finished(self):
2056 left = self.num_queued()
2057 print "%s: %s machines left" % (self.name, left)
2058 return left==0
mbligh36768f02008-02-22 18:28:33 +00002059
jadmanski0afbb632008-06-06 21:10:57 +00002060 def stop_synchronizing(self):
2061 self.update_field('synchronizing', False)
2062 self.set_status('Queued', update_queues = False)
mbligh36768f02008-02-22 18:28:33 +00002063
2064
jadmanski0afbb632008-06-06 21:10:57 +00002065 def stop_all_entries(self):
2066 for child_entry in self.get_host_queue_entries():
2067 if not child_entry.complete:
2068 child_entry.set_status('Stopped')
mblighe2586682008-02-29 22:45:46 +00002069
2070
jadmanski0afbb632008-06-06 21:10:57 +00002071 def write_to_machines_file(self, queue_entry):
2072 hostname = queue_entry.get_host().hostname
2073 print "writing %s to job %s machines file" % (hostname, self.id)
2074 file_path = os.path.join(self.job_dir, '.machines')
2075 mf = open(file_path, 'a')
2076 mf.write("%s\n" % queue_entry.get_host().hostname)
2077 mf.close()
mbligh36768f02008-02-22 18:28:33 +00002078
2079
jadmanski0afbb632008-06-06 21:10:57 +00002080 def create_results_dir(self, queue_entry=None):
2081 print "create: active: %s complete %s" % (self.num_active(),
2082 self.num_complete())
mbligh36768f02008-02-22 18:28:33 +00002083
jadmanski0afbb632008-06-06 21:10:57 +00002084 if not os.path.exists(self.job_dir):
2085 os.makedirs(self.job_dir)
mbligh36768f02008-02-22 18:28:33 +00002086
jadmanski0afbb632008-06-06 21:10:57 +00002087 if queue_entry:
2088 return queue_entry.results_dir()
2089 return self.job_dir
mbligh36768f02008-02-22 18:28:33 +00002090
2091
jadmanski0afbb632008-06-06 21:10:57 +00002092 def run(self, queue_entry):
2093 results_dir = self.create_results_dir(queue_entry)
mbligh36768f02008-02-22 18:28:33 +00002094
jadmanski0afbb632008-06-06 21:10:57 +00002095 if self.is_synchronous():
2096 if not self.is_ready():
2097 return Agent([VerifySynchronousTask(
showard3d9899a2008-07-31 02:11:58 +00002098 queue_entry=queue_entry,
2099 run_verify=self.run_verify)],
jadmanski0afbb632008-06-06 21:10:57 +00002100 [queue_entry.id])
mbligh36768f02008-02-22 18:28:33 +00002101
showardccb86d72008-08-22 18:16:22 +00002102 queue_entry.set_status('Starting')
2103
jadmanski0afbb632008-06-06 21:10:57 +00002104 ctrl = open(os.tmpnam(), 'w')
2105 if self.control_file:
2106 ctrl.write(self.control_file)
2107 else:
2108 ctrl.write("")
2109 ctrl.flush()
mbligh36768f02008-02-22 18:28:33 +00002110
jadmanski0afbb632008-06-06 21:10:57 +00002111 if self.is_synchronous():
2112 queue_entries = self.get_host_queue_entries()
2113 else:
2114 assert queue_entry
2115 queue_entries = [queue_entry]
2116 hostnames = ','.join([entry.get_host().hostname
2117 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002118
jadmanski0afbb632008-06-06 21:10:57 +00002119 # determine the job tag
2120 if self.is_synchronous() or self.num_machines() == 1:
2121 job_name = "%s-%s" % (self.id, self.owner)
2122 else:
2123 job_name = "%s-%s/%s" % (self.id, self.owner,
2124 hostnames)
mbligh6437ff52008-04-17 15:24:38 +00002125
jadmanski0afbb632008-06-06 21:10:57 +00002126 params = [_autoserv_path, '-P', job_name, '-p', '-n',
2127 '-r', os.path.abspath(results_dir),
2128 '-b', '-u', self.owner, '-l', self.name,
2129 '-m', hostnames, ctrl.name]
mbligh36768f02008-02-22 18:28:33 +00002130
jadmanski0afbb632008-06-06 21:10:57 +00002131 if not self.is_server_job():
2132 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002133
jadmanski0afbb632008-06-06 21:10:57 +00002134 tasks = []
2135 if not self.is_synchronous():
showard3d9899a2008-07-31 02:11:58 +00002136 tasks.append(VerifyTask(queue_entry, run_verify=self.run_verify))
mblighe2586682008-02-29 22:45:46 +00002137
showard3d9899a2008-07-31 02:11:58 +00002138 tasks.append(QueueTask(job=self,
2139 queue_entries=queue_entries,
2140 cmd=params))
mbligh36768f02008-02-22 18:28:33 +00002141
jadmanski0afbb632008-06-06 21:10:57 +00002142 ids = []
2143 for entry in queue_entries:
2144 ids.append(entry.id)
mbligh36768f02008-02-22 18:28:33 +00002145
showard4c5374f2008-09-04 17:02:56 +00002146 agent = Agent(tasks, ids, num_processes=len(queue_entries))
jadmanski0afbb632008-06-06 21:10:57 +00002147
2148 return agent
mbligh36768f02008-02-22 18:28:33 +00002149
2150
2151if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002152 main()